[#426] Put prometheus behind pkg/metrics

Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
Alex Vanin 2021-03-16 11:14:56 +03:00 committed by Leonard Lyubich
parent 3c0e47e6fd
commit b8e10571c6
19 changed files with 561 additions and 311 deletions

View file

@ -24,6 +24,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
"github.com/nspcc-dev/neofs-node/pkg/metrics"
"github.com/nspcc-dev/neofs-node/pkg/morph/client" "github.com/nspcc-dev/neofs-node/pkg/morph/client"
"github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper" "github.com/nspcc-dev/neofs-node/pkg/morph/client/container/wrapper"
nmwrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper" nmwrapper "github.com/nspcc-dev/neofs-node/pkg/morph/client/netmap/wrapper"
@ -184,7 +185,9 @@ type cfg struct {
profiler profiler.Profiler profiler profiler.Profiler
metrics profiler.Metrics metricsServer profiler.Metrics
metricsCollector *metrics.StorageMetrics
workers []worker workers []worker
@ -362,6 +365,10 @@ func initCfg(path string) *cfg {
healthStatus: atomic.NewInt32(int32(control.HealthStatus_HEALTH_STATUS_UNDEFINED)), healthStatus: atomic.NewInt32(int32(control.HealthStatus_HEALTH_STATUS_UNDEFINED)),
} }
if viperCfg.GetBool(cfgMetricsEnable) {
c.metricsCollector = metrics.NewStorageMetrics()
}
initLocalStorage(c) initLocalStorage(c)
return c return c
@ -457,7 +464,7 @@ func initLocalStorage(c *cfg) {
ls := engine.New( ls := engine.New(
engine.WithLogger(c.log), engine.WithLogger(c.log),
engine.WithMetrics(c.viper.GetBool(cfgMetricsEnable)), engine.WithMetrics(c.metricsCollector),
) )
for _, opts := range c.cfgObject.cfgLocalStorage.shardOpts { for _, opts := range c.cfgObject.cfgLocalStorage.shardOpts {

View file

@ -5,11 +5,13 @@ import (
) )
func initMetrics(c *cfg) { func initMetrics(c *cfg) {
c.metrics = profiler.NewMetrics(c.log, c.viper) if c.metricsCollector != nil {
c.metricsServer = profiler.NewMetrics(c.log, c.viper)
}
} }
func serveMetrics(c *cfg) { func serveMetrics(c *cfg) {
if c.metrics != nil { if c.metricsServer != nil {
c.metrics.Start(c.ctx) c.metricsServer.Start(c.ctx)
} }
} }

View file

@ -345,8 +345,8 @@ func initObjectService(c *cfg) {
) )
var firstSvc objectService.ServiceServer = aclSvc var firstSvc objectService.ServiceServer = aclSvc
if c.viper.GetBool(cfgMetricsEnable) { if c.metricsCollector != nil {
firstSvc = objectService.NewMetricCollector(aclSvc) firstSvc = objectService.NewMetricCollector(aclSvc, c.metricsCollector)
} }
objectGRPC.RegisterObjectServiceServer(c.cfgGRPC.server, objectGRPC.RegisterObjectServiceServer(c.cfgGRPC.server,

View file

@ -38,8 +38,8 @@ func (r *ListContainersRes) Containers() []*container.ID {
// ContainerSize returns sum of estimation container sizes among all shards. // ContainerSize returns sum of estimation container sizes among all shards.
func (e *StorageEngine) ContainerSize(prm *ContainerSizePrm) *ContainerSizeRes { func (e *StorageEngine) ContainerSize(prm *ContainerSizePrm) *ContainerSizeRes {
if e.enableMetrics { if e.metrics != nil {
defer elapsed(estimateContainerSizeDuration)() defer elapsed(e.metrics.AddEstimateContainerSizeDuration)()
} }
return &ContainerSizeRes{ return &ContainerSizeRes{
@ -74,8 +74,8 @@ func (e *StorageEngine) containerSize(id *container.ID) (total uint64) {
// ListContainers returns unique container IDs presented in the engine objects. // ListContainers returns unique container IDs presented in the engine objects.
func (e *StorageEngine) ListContainers(_ *ListContainersPrm) *ListContainersRes { func (e *StorageEngine) ListContainers(_ *ListContainersPrm) *ListContainersRes {
if e.enableMetrics { if e.metrics != nil {
defer elapsed(listContainersDuration)() defer elapsed(e.metrics.AddListContainersDuration)()
} }
return &ListContainersRes{ return &ListContainersRes{

View file

@ -30,10 +30,6 @@ func (e *StorageEngine) Init() error {
} }
} }
if e.enableMetrics {
registerMetrics()
}
return nil return nil
} }

View file

@ -27,8 +27,8 @@ func (p *DeletePrm) WithAddresses(addr ...*objectSDK.Address) *DeletePrm {
// Delete marks the objects to be removed. // Delete marks the objects to be removed.
func (e *StorageEngine) Delete(prm *DeletePrm) (*DeleteRes, error) { func (e *StorageEngine) Delete(prm *DeletePrm) (*DeleteRes, error) {
if e.enableMetrics { if e.metrics != nil {
defer elapsed(deleteDuration)() defer elapsed(e.metrics.AddDeleteDuration)()
} }
shPrm := new(shard.InhumePrm) shPrm := new(shard.InhumePrm)

View file

@ -23,7 +23,7 @@ type Option func(*cfg)
type cfg struct { type cfg struct {
log *logger.Logger log *logger.Logger
enableMetrics bool metrics MetricRegister
} }
func defaultCfg() *cfg { func defaultCfg() *cfg {
@ -54,8 +54,8 @@ func WithLogger(l *logger.Logger) Option {
} }
} }
func WithMetrics(v bool) Option { func WithMetrics(v MetricRegister) Option {
return func(c *cfg) { return func(c *cfg) {
c.enableMetrics = v c.metrics = v
} }
} }

View file

@ -38,8 +38,8 @@ func (p *ExistsRes) Exists() bool {
// Returns any error encountered that does not allow to // Returns any error encountered that does not allow to
// unambiguously determine the presence of an object. // unambiguously determine the presence of an object.
func (e *StorageEngine) Exists(prm *ExistsPrm) (*ExistsRes, error) { func (e *StorageEngine) Exists(prm *ExistsPrm) (*ExistsRes, error) {
if e.enableMetrics { if e.metrics != nil {
defer elapsed(existsDuration)() defer elapsed(e.metrics.AddExistsDuration)()
} }
exists, err := e.exists(prm.addr) exists, err := e.exists(prm.addr)

View file

@ -42,8 +42,8 @@ func (r *GetRes) Object() *object.Object {
// //
// Returns ErrNotFound if requested object is missing in local storage. // Returns ErrNotFound if requested object is missing in local storage.
func (e *StorageEngine) Get(prm *GetPrm) (*GetRes, error) { func (e *StorageEngine) Get(prm *GetPrm) (*GetRes, error) {
if e.enableMetrics { if e.metrics != nil {
defer elapsed(getDuration)() defer elapsed(e.metrics.AddGetDuration)()
} }
var ( var (

View file

@ -56,8 +56,8 @@ func (r *HeadRes) Header() *object.Object {
// Returns object.ErrNotFound if requested object is missing in local storage. // Returns object.ErrNotFound if requested object is missing in local storage.
// Returns object.ErrAlreadyRemoved if requested object was inhumed. // Returns object.ErrAlreadyRemoved if requested object was inhumed.
func (e *StorageEngine) Head(prm *HeadPrm) (*HeadRes, error) { func (e *StorageEngine) Head(prm *HeadPrm) (*HeadRes, error) {
if e.enableMetrics { if e.metrics != nil {
defer elapsed(headDuration)() defer elapsed(e.metrics.AddHeadDuration)()
} }
var ( var (

View file

@ -49,8 +49,8 @@ var errInhumeFailure = errors.New("inhume operation failed")
// Inhume calls metabase. Inhume method to mark object as removed. It won't be // Inhume calls metabase. Inhume method to mark object as removed. It won't be
// removed physically from shard until `Delete` operation. // removed physically from shard until `Delete` operation.
func (e *StorageEngine) Inhume(prm *InhumePrm) (*InhumeRes, error) { func (e *StorageEngine) Inhume(prm *InhumePrm) (*InhumeRes, error) {
if e.enableMetrics { if e.metrics != nil {
defer elapsed(inhumeDuration)() defer elapsed(e.metrics.AddInhumeDuration)()
} }
shPrm := new(shard.InhumePrm) shPrm := new(shard.InhumePrm)

View file

@ -2,112 +2,26 @@ package engine
import ( import (
"time" "time"
"github.com/prometheus/client_golang/prometheus"
) )
const ( type MetricRegister interface {
namespace = "neofs_node" AddListContainersDuration(d time.Duration)
subsystem = "engine" AddEstimateContainerSizeDuration(d time.Duration)
) AddDeleteDuration(d time.Duration)
AddExistsDuration(d time.Duration)
var ( AddGetDuration(d time.Duration)
listContainersDuration = prometheus.NewCounter(prometheus.CounterOpts{ AddHeadDuration(d time.Duration)
Namespace: namespace, AddInhumeDuration(d time.Duration)
Subsystem: subsystem, AddPutDuration(d time.Duration)
Name: "list_containers_duration", AddRangeDuration(d time.Duration)
Help: "Accumulated duration of engine list containers operations", AddSearchDuration(d time.Duration)
}) AddListObjectsDuration(d time.Duration)
estimateContainerSizeDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "estimate_container_size_duration",
Help: "Accumulated duration of engine container size estimate operations",
})
deleteDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "delete_duration",
Help: "Accumulated duration of engine delete operations",
})
existsDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "exists_duration",
Help: "Accumulated duration of engine exists operations",
})
getDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "get_duration",
Help: "Accumulated duration of engine get operations",
})
headDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "head_duration",
Help: "Accumulated duration of engine head operations",
})
inhumeDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "inhume_duration",
Help: "Accumulated duration of engine inhume operations",
})
putDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "put_duration",
Help: "Accumulated duration of engine put operations",
})
rangeDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "range_duration",
Help: "Accumulated duration of engine range operations",
})
searchDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "search_duration",
Help: "Accumulated duration of engine search operations",
})
listObjectsDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "list_objects_duration",
Help: "Accumulated duration of engine list objects operations",
})
)
func registerMetrics() {
prometheus.MustRegister(listContainersDuration)
prometheus.MustRegister(estimateContainerSizeDuration)
prometheus.MustRegister(deleteDuration)
prometheus.MustRegister(existsDuration)
prometheus.MustRegister(getDuration)
prometheus.MustRegister(headDuration)
prometheus.MustRegister(inhumeDuration)
prometheus.MustRegister(putDuration)
prometheus.MustRegister(rangeDuration)
prometheus.MustRegister(searchDuration)
prometheus.MustRegister(listObjectsDuration)
} }
func elapsed(c prometheus.Counter) func() { func elapsed(addFunc func(d time.Duration)) func() {
t := time.Now() t := time.Now()
return func() { return func() {
c.Add(float64(time.Since(t))) addFunc(time.Since(t))
} }
} }

View file

@ -34,8 +34,8 @@ func (p *PutPrm) WithObject(obj *object.Object) *PutPrm {
// Returns any error encountered that // Returns any error encountered that
// did not allow to completely save the object. // did not allow to completely save the object.
func (e *StorageEngine) Put(prm *PutPrm) (*PutRes, error) { func (e *StorageEngine) Put(prm *PutPrm) (*PutRes, error) {
if e.enableMetrics { if e.metrics != nil {
defer elapsed(putDuration)() defer elapsed(e.metrics.AddPutDuration)()
} }
_, err := e.exists(prm.obj.Address()) // todo: make this check parallel _, err := e.exists(prm.obj.Address()) // todo: make this check parallel

View file

@ -60,8 +60,8 @@ func (r *RngRes) Object() *object.Object {
// Returns ErrAlreadyRemoved if requested object is inhumed. // Returns ErrAlreadyRemoved if requested object is inhumed.
// Returns ErrRangeOutOfBounds if requested object range is out of bounds. // Returns ErrRangeOutOfBounds if requested object range is out of bounds.
func (e *StorageEngine) GetRange(prm *RngPrm) (*RngRes, error) { func (e *StorageEngine) GetRange(prm *RngPrm) (*RngRes, error) {
if e.enableMetrics { if e.metrics != nil {
defer elapsed(rangeDuration)() defer elapsed(e.metrics.AddRangeDuration)()
} }
var ( var (

View file

@ -48,8 +48,8 @@ func (r *SelectRes) AddressList() []*object.Address {
// //
// Returns any error encountered that did not allow to completely select the objects. // Returns any error encountered that did not allow to completely select the objects.
func (e *StorageEngine) Select(prm *SelectPrm) (*SelectRes, error) { func (e *StorageEngine) Select(prm *SelectPrm) (*SelectRes, error) {
if e.enableMetrics { if e.metrics != nil {
defer elapsed(searchDuration)() defer elapsed(e.metrics.AddSearchDuration)()
} }
addrList := make([]*object.Address, 0) addrList := make([]*object.Address, 0)
@ -98,8 +98,8 @@ func (e *StorageEngine) Select(prm *SelectPrm) (*SelectRes, error) {
// List returns `limit` available physically storage object addresses in engine. // List returns `limit` available physically storage object addresses in engine.
// If limit is zero, then returns all available object addresses. // If limit is zero, then returns all available object addresses.
func (e *StorageEngine) List(limit uint64) (*SelectRes, error) { func (e *StorageEngine) List(limit uint64) (*SelectRes, error) {
if e.enableMetrics { if e.metrics != nil {
defer elapsed(listObjectsDuration)() defer elapsed(e.metrics.AddListObjectsDuration)()
} }
addrList := make([]*object.Address, limit) addrList := make([]*object.Address, limit)

178
pkg/metrics/engine.go Normal file
View file

@ -0,0 +1,178 @@
package metrics
import (
"time"
"github.com/prometheus/client_golang/prometheus"
)
type (
engineMetrics struct {
listContainersDuration prometheus.Counter
estimateContainerSizeDuration prometheus.Counter
deleteDuration prometheus.Counter
existsDuration prometheus.Counter
getDuration prometheus.Counter
headDuration prometheus.Counter
inhumeDuration prometheus.Counter
putDuration prometheus.Counter
rangeDuration prometheus.Counter
searchDuration prometheus.Counter
listObjectsDuration prometheus.Counter
}
)
const engineSubsystem = "engine"
func newEngineMetrics() engineMetrics {
var (
listContainersDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: engineSubsystem,
Name: "list_containers_duration",
Help: "Accumulated duration of engine list containers operations",
})
estimateContainerSizeDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: engineSubsystem,
Name: "estimate_container_size_duration",
Help: "Accumulated duration of engine container size estimate operations",
})
deleteDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: engineSubsystem,
Name: "delete_duration",
Help: "Accumulated duration of engine delete operations",
})
existsDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: engineSubsystem,
Name: "exists_duration",
Help: "Accumulated duration of engine exists operations",
})
getDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: engineSubsystem,
Name: "get_duration",
Help: "Accumulated duration of engine get operations",
})
headDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: engineSubsystem,
Name: "head_duration",
Help: "Accumulated duration of engine head operations",
})
inhumeDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: engineSubsystem,
Name: "inhume_duration",
Help: "Accumulated duration of engine inhume operations",
})
putDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: engineSubsystem,
Name: "put_duration",
Help: "Accumulated duration of engine put operations",
})
rangeDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: engineSubsystem,
Name: "range_duration",
Help: "Accumulated duration of engine range operations",
})
searchDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: engineSubsystem,
Name: "search_duration",
Help: "Accumulated duration of engine search operations",
})
listObjectsDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: engineSubsystem,
Name: "list_objects_duration",
Help: "Accumulated duration of engine list objects operations",
})
)
return engineMetrics{
listContainersDuration: listContainersDuration,
estimateContainerSizeDuration: estimateContainerSizeDuration,
deleteDuration: deleteDuration,
existsDuration: existsDuration,
getDuration: getDuration,
headDuration: headDuration,
inhumeDuration: inhumeDuration,
putDuration: putDuration,
rangeDuration: rangeDuration,
searchDuration: searchDuration,
listObjectsDuration: listObjectsDuration,
}
}
func (m engineMetrics) register() {
prometheus.MustRegister(m.listContainersDuration)
prometheus.MustRegister(m.estimateContainerSizeDuration)
prometheus.MustRegister(m.deleteDuration)
prometheus.MustRegister(m.existsDuration)
prometheus.MustRegister(m.getDuration)
prometheus.MustRegister(m.headDuration)
prometheus.MustRegister(m.inhumeDuration)
prometheus.MustRegister(m.putDuration)
prometheus.MustRegister(m.rangeDuration)
prometheus.MustRegister(m.searchDuration)
prometheus.MustRegister(m.listObjectsDuration)
}
func (m engineMetrics) AddListContainersDuration(d time.Duration) {
m.listObjectsDuration.Add(float64(d))
}
func (m engineMetrics) AddEstimateContainerSizeDuration(d time.Duration) {
m.estimateContainerSizeDuration.Add(float64(d))
}
func (m engineMetrics) AddDeleteDuration(d time.Duration) {
m.deleteDuration.Add(float64(d))
}
func (m engineMetrics) AddExistsDuration(d time.Duration) {
m.existsDuration.Add(float64(d))
}
func (m engineMetrics) AddGetDuration(d time.Duration) {
m.getDuration.Add(float64(d))
}
func (m engineMetrics) AddHeadDuration(d time.Duration) {
m.headDuration.Add(float64(d))
}
func (m engineMetrics) AddInhumeDuration(d time.Duration) {
m.inhumeDuration.Add(float64(d))
}
func (m engineMetrics) AddPutDuration(d time.Duration) {
m.putDuration.Add(float64(d))
}
func (m engineMetrics) AddRangeDuration(d time.Duration) {
m.rangeDuration.Add(float64(d))
}
func (m engineMetrics) AddSearchDuration(d time.Duration) {
m.searchDuration.Add(float64(d))
}
func (m engineMetrics) AddListObjectsDuration(d time.Duration) {
m.listObjectsDuration.Add(float64(d))
}

21
pkg/metrics/metrics.go Normal file
View file

@ -0,0 +1,21 @@
package metrics
const namespace = "neofs_node"
type StorageMetrics struct {
objectServiceMetrics
engineMetrics
}
func NewStorageMetrics() *StorageMetrics {
objectService := newObjectServiceMetrics()
objectService.register()
engine := newEngineMetrics()
engine.register()
return &StorageMetrics{
objectServiceMetrics: objectService,
engineMetrics: engine,
}
}

256
pkg/metrics/object.go Normal file
View file

@ -0,0 +1,256 @@
package metrics
import (
"time"
"github.com/prometheus/client_golang/prometheus"
)
const objectSubsystem = "object"
type (
objectServiceMetrics struct {
getCounter prometheus.Counter
putCounter prometheus.Counter
headCounter prometheus.Counter
searchCounter prometheus.Counter
deleteCounter prometheus.Counter
rangeCounter prometheus.Counter
rangeHashCounter prometheus.Counter
getDuration prometheus.Counter
putDuration prometheus.Counter
headDuration prometheus.Counter
searchDuration prometheus.Counter
deleteDuration prometheus.Counter
rangeDuration prometheus.Counter
rangeHashDuration prometheus.Counter
putPayload prometheus.Counter
getPayload prometheus.Counter
}
)
func newObjectServiceMetrics() objectServiceMetrics {
var ( // Request counter metrics.
getCounter = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: objectSubsystem,
Name: "get_req_count",
Help: "Number of get request processed",
})
putCounter = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: objectSubsystem,
Name: "put_req_count",
Help: "Number of put request processed",
})
headCounter = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: objectSubsystem,
Name: "head_req_count",
Help: "Number of head request processed",
})
searchCounter = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: objectSubsystem,
Name: "search_req_count",
Help: "Number of search request processed",
})
deleteCounter = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: objectSubsystem,
Name: "delete_req_count",
Help: "Number of delete request processed",
})
rangeCounter = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: objectSubsystem,
Name: "range_req_count",
Help: "Number of range request processed",
})
rangeHashCounter = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: objectSubsystem,
Name: "range_hash_req_count",
Help: "Number of range hash request processed",
})
)
var ( // Request duration metrics.
getDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: objectSubsystem,
Name: "get_req_duration",
Help: "Accumulated get request process duration",
})
putDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: objectSubsystem,
Name: "put_req_duration",
Help: "Accumulated put request process duration",
})
headDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: objectSubsystem,
Name: "head_req_duration",
Help: "Accumulated head request process duration",
})
searchDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: objectSubsystem,
Name: "search_req_duration",
Help: "Accumulated search request process duration",
})
deleteDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: objectSubsystem,
Name: "delete_req_duration",
Help: "Accumulated delete request process duration",
})
rangeDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: objectSubsystem,
Name: "range_req_duration",
Help: "Accumulated range request process duration",
})
rangeHashDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: objectSubsystem,
Name: "range_hash_req_duration",
Help: "Accumulated range hash request process duration",
})
)
var ( // Object payload metrics.
putPayload = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: objectSubsystem,
Name: "put_payload",
Help: "Accumulated payload size at object put method",
})
getPayload = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: objectSubsystem,
Name: "get_payload",
Help: "Accumulated payload size at object get method",
})
)
return objectServiceMetrics{
getCounter: getCounter,
putCounter: putCounter,
headCounter: headCounter,
searchCounter: searchCounter,
deleteCounter: deleteCounter,
rangeCounter: rangeCounter,
rangeHashCounter: rangeHashCounter,
getDuration: getDuration,
putDuration: putDuration,
headDuration: headDuration,
searchDuration: searchDuration,
deleteDuration: deleteDuration,
rangeDuration: rangeDuration,
rangeHashDuration: rangeHashDuration,
putPayload: putPayload,
getPayload: getPayload,
}
}
func (m objectServiceMetrics) register() {
prometheus.MustRegister(m.getCounter)
prometheus.MustRegister(m.putCounter)
prometheus.MustRegister(m.headCounter)
prometheus.MustRegister(m.searchCounter)
prometheus.MustRegister(m.deleteCounter)
prometheus.MustRegister(m.rangeCounter)
prometheus.MustRegister(m.rangeHashCounter)
prometheus.MustRegister(m.getDuration)
prometheus.MustRegister(m.putDuration)
prometheus.MustRegister(m.headDuration)
prometheus.MustRegister(m.searchDuration)
prometheus.MustRegister(m.deleteDuration)
prometheus.MustRegister(m.rangeDuration)
prometheus.MustRegister(m.rangeHashDuration)
prometheus.MustRegister(m.putPayload)
prometheus.MustRegister(m.getPayload)
}
func (m objectServiceMetrics) IncGetReqCounter() {
m.getCounter.Inc()
}
func (m objectServiceMetrics) IncPutReqCounter() {
m.putCounter.Inc()
}
func (m objectServiceMetrics) IncHeadReqCounter() {
m.headCounter.Inc()
}
func (m objectServiceMetrics) IncSearchReqCounter() {
m.searchCounter.Inc()
}
func (m objectServiceMetrics) IncDeleteReqCounter() {
m.deleteCounter.Inc()
}
func (m objectServiceMetrics) IncRangeReqCounter() {
m.rangeCounter.Inc()
}
func (m objectServiceMetrics) IncRangeHashReqCounter() {
m.rangeHashCounter.Inc()
}
func (m objectServiceMetrics) AddGetReqDuration(d time.Duration) {
m.getDuration.Add(float64(d))
}
func (m objectServiceMetrics) AddPutReqDuration(d time.Duration) {
m.putDuration.Add(float64(d))
}
func (m objectServiceMetrics) AddHeadReqDuration(d time.Duration) {
m.headDuration.Add(float64(d))
}
func (m objectServiceMetrics) AddSearchReqDuration(d time.Duration) {
m.searchDuration.Add(float64(d))
}
func (m objectServiceMetrics) AddDeleteReqDuration(d time.Duration) {
m.deleteDuration.Add(float64(d))
}
func (m objectServiceMetrics) AddRangeReqDuration(d time.Duration) {
m.rangeDuration.Add(float64(d))
}
func (m objectServiceMetrics) AddRangeHashReqDuration(d time.Duration) {
m.rangeHashDuration.Add(float64(d))
}
func (m objectServiceMetrics) AddPutPayload(ln int) {
m.putPayload.Add(float64(ln))
}
func (m objectServiceMetrics) AddGetPayload(ln int) {
m.getPayload.Add(float64(ln))
}

View file

@ -6,198 +6,73 @@ import (
"github.com/nspcc-dev/neofs-api-go/v2/object" "github.com/nspcc-dev/neofs-api-go/v2/object"
"github.com/nspcc-dev/neofs-node/pkg/services/util" "github.com/nspcc-dev/neofs-node/pkg/services/util"
"github.com/prometheus/client_golang/prometheus"
) )
type ( type (
MetricCollector struct { MetricCollector struct {
next ServiceServer next ServiceServer
metrics MetricRegister
} }
getStreamMetric struct { getStreamMetric struct {
util.ServerStream util.ServerStream
stream GetObjectStream stream GetObjectStream
metrics MetricRegister
} }
putStreamMetric struct { putStreamMetric struct {
stream object.PutObjectStreamer stream object.PutObjectStreamer
metrics MetricRegister
}
MetricRegister interface {
IncGetReqCounter()
IncPutReqCounter()
IncHeadReqCounter()
IncSearchReqCounter()
IncDeleteReqCounter()
IncRangeReqCounter()
IncRangeHashReqCounter()
AddGetReqDuration(time.Duration)
AddPutReqDuration(time.Duration)
AddHeadReqDuration(time.Duration)
AddSearchReqDuration(time.Duration)
AddDeleteReqDuration(time.Duration)
AddRangeReqDuration(time.Duration)
AddRangeHashReqDuration(time.Duration)
AddPutPayload(int)
AddGetPayload(int)
} }
) )
const ( func NewMetricCollector(next ServiceServer, register MetricRegister) *MetricCollector {
namespace = "neofs_node"
subsystem = "object"
)
// Request counter metrics.
var (
getCounter = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "get_req_count",
Help: "Number of get request processed",
ConstLabels: nil,
})
putCounter = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "put_req_count",
Help: "Number of put request processed",
})
headCounter = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "head_req_count",
Help: "Number of head request processed",
})
searchCounter = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "search_req_count",
Help: "Number of search request processed",
})
deleteCounter = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "delete_req_count",
Help: "Number of delete request processed",
})
rangeCounter = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "range_req_count",
Help: "Number of range request processed",
})
rangeHashCounter = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "range_hash_req_count",
Help: "Number of range hash request processed",
})
)
// Request duration metrics.
var (
getDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "get_req_duration",
Help: "Accumulated get request process duration",
})
putDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "put_req_duration",
Help: "Accumulated put request process duration",
})
headDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "head_req_duration",
Help: "Accumulated head request process duration",
})
searchDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "search_req_duration",
Help: "Accumulated search request process duration",
})
deleteDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "delete_req_duration",
Help: "Accumulated delete request process duration",
})
rangeDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "range_req_duration",
Help: "Accumulated range request process duration",
})
rangeHashDuration = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "range_hash_req_duration",
Help: "Accumulated range hash request process duration",
})
)
// Object payload metrics.
var (
putPayload = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "put_payload",
Help: "Accumulated payload size at object put method",
})
getPayload = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "get_payload",
Help: "Accumulated payload size at object get method",
})
)
func registerMetrics() {
prometheus.MustRegister(getCounter) // todo: replace with for loop over map
prometheus.MustRegister(putCounter)
prometheus.MustRegister(headCounter)
prometheus.MustRegister(searchCounter)
prometheus.MustRegister(deleteCounter)
prometheus.MustRegister(rangeCounter)
prometheus.MustRegister(rangeHashCounter)
prometheus.MustRegister(getDuration)
prometheus.MustRegister(putDuration)
prometheus.MustRegister(headDuration)
prometheus.MustRegister(searchDuration)
prometheus.MustRegister(deleteDuration)
prometheus.MustRegister(rangeDuration)
prometheus.MustRegister(rangeHashDuration)
prometheus.MustRegister(putPayload)
prometheus.MustRegister(getPayload)
}
func NewMetricCollector(next ServiceServer) *MetricCollector {
registerMetrics()
return &MetricCollector{ return &MetricCollector{
next: next, next: next,
metrics: register,
} }
} }
func (m MetricCollector) Get(req *object.GetRequest, stream GetObjectStream) error { func (m MetricCollector) Get(req *object.GetRequest, stream GetObjectStream) error {
t := time.Now() t := time.Now()
defer func() { defer func() {
getCounter.Inc() m.metrics.IncGetReqCounter()
getDuration.Add(float64(time.Since(t))) m.metrics.AddGetReqDuration(time.Since(t))
}() }()
return m.next.Get(req, &getStreamMetric{ return m.next.Get(req, &getStreamMetric{
ServerStream: stream, ServerStream: stream,
stream: stream, stream: stream,
metrics: m.metrics,
}) })
} }
func (m MetricCollector) Put(ctx context.Context) (object.PutObjectStreamer, error) { func (m MetricCollector) Put(ctx context.Context) (object.PutObjectStreamer, error) {
t := time.Now() t := time.Now()
defer func() { defer func() {
putCounter.Inc() m.metrics.IncPutReqCounter()
putDuration.Add(float64(time.Since(t))) m.metrics.AddPutReqDuration(time.Since(t))
}() }()
stream, err := m.next.Put(ctx) stream, err := m.next.Put(ctx)
@ -205,14 +80,17 @@ func (m MetricCollector) Put(ctx context.Context) (object.PutObjectStreamer, err
return nil, err return nil, err
} }
return &putStreamMetric{stream: stream}, nil return &putStreamMetric{
stream: stream,
metrics: m.metrics,
}, nil
} }
func (m MetricCollector) Head(ctx context.Context, request *object.HeadRequest) (*object.HeadResponse, error) { func (m MetricCollector) Head(ctx context.Context, request *object.HeadRequest) (*object.HeadResponse, error) {
t := time.Now() t := time.Now()
defer func() { defer func() {
headCounter.Inc() m.metrics.IncHeadReqCounter()
headDuration.Add(float64(time.Since(t))) m.metrics.AddHeadReqDuration(time.Since(t))
}() }()
return m.next.Head(ctx, request) return m.next.Head(ctx, request)
@ -221,8 +99,8 @@ func (m MetricCollector) Head(ctx context.Context, request *object.HeadRequest)
func (m MetricCollector) Search(req *object.SearchRequest, stream SearchStream) error { func (m MetricCollector) Search(req *object.SearchRequest, stream SearchStream) error {
t := time.Now() t := time.Now()
defer func() { defer func() {
searchCounter.Inc() m.metrics.IncSearchReqCounter()
searchDuration.Add(float64(time.Since(t))) m.metrics.AddSearchReqDuration(time.Since(t))
}() }()
return m.next.Search(req, stream) return m.next.Search(req, stream)
@ -231,8 +109,8 @@ func (m MetricCollector) Search(req *object.SearchRequest, stream SearchStream)
func (m MetricCollector) Delete(ctx context.Context, request *object.DeleteRequest) (*object.DeleteResponse, error) { func (m MetricCollector) Delete(ctx context.Context, request *object.DeleteRequest) (*object.DeleteResponse, error) {
t := time.Now() t := time.Now()
defer func() { defer func() {
deleteCounter.Inc() m.metrics.IncDeleteReqCounter()
deleteDuration.Add(float64(time.Since(t))) m.metrics.AddDeleteReqDuration(time.Since(t))
}() }()
return m.next.Delete(ctx, request) return m.next.Delete(ctx, request)
@ -241,8 +119,8 @@ func (m MetricCollector) Delete(ctx context.Context, request *object.DeleteReque
func (m MetricCollector) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error { func (m MetricCollector) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error {
t := time.Now() t := time.Now()
defer func() { defer func() {
rangeCounter.Inc() m.metrics.IncRangeReqCounter()
rangeDuration.Add(float64(time.Since(t))) m.metrics.AddRangeReqDuration(time.Since(t))
}() }()
return m.next.GetRange(req, stream) return m.next.GetRange(req, stream)
@ -251,8 +129,8 @@ func (m MetricCollector) GetRange(req *object.GetRangeRequest, stream GetObjectR
func (m MetricCollector) GetRangeHash(ctx context.Context, request *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) { func (m MetricCollector) GetRangeHash(ctx context.Context, request *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
t := time.Now() t := time.Now()
defer func() { defer func() {
rangeHashCounter.Inc() m.metrics.IncRangeHashReqCounter()
rangeHashDuration.Add(float64(time.Since(t))) m.metrics.AddRangeHashReqDuration(time.Since(t))
}() }()
return m.next.GetRangeHash(ctx, request) return m.next.GetRangeHash(ctx, request)
@ -261,8 +139,7 @@ func (m MetricCollector) GetRangeHash(ctx context.Context, request *object.GetRa
func (s getStreamMetric) Send(resp *object.GetResponse) error { func (s getStreamMetric) Send(resp *object.GetResponse) error {
chunk, ok := resp.GetBody().GetObjectPart().(*object.GetObjectPartChunk) chunk, ok := resp.GetBody().GetObjectPart().(*object.GetObjectPartChunk)
if ok { if ok {
ln := len(chunk.GetChunk()) s.metrics.AddGetPayload(len(chunk.GetChunk()))
getPayload.Add(float64(ln))
} }
return s.stream.Send(resp) return s.stream.Send(resp)
@ -271,8 +148,7 @@ func (s getStreamMetric) Send(resp *object.GetResponse) error {
func (s putStreamMetric) Send(req *object.PutRequest) error { func (s putStreamMetric) Send(req *object.PutRequest) error {
chunk, ok := req.GetBody().GetObjectPart().(*object.PutObjectPartChunk) chunk, ok := req.GetBody().GetObjectPart().(*object.PutObjectPartChunk)
if ok { if ok {
ln := len(chunk.GetChunk()) s.metrics.AddPutPayload(len(chunk.GetChunk()))
putPayload.Add(float64(ln))
} }
return s.stream.Send(req) return s.stream.Send(req)