From 980b774af21a4bcd4428f267e7f3f3f9f25bfa1c Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Mon, 15 Mar 2021 16:09:27 +0300 Subject: [PATCH] [#426] engine: Support duration metrics With `enable metrics` option, engine will collect durations for all public methods. Signed-off-by: Alex Vanin --- pkg/local_object_storage/engine/container.go | 8 ++ pkg/local_object_storage/engine/control.go | 4 + pkg/local_object_storage/engine/delete.go | 4 + pkg/local_object_storage/engine/engine.go | 8 ++ pkg/local_object_storage/engine/exists.go | 4 + pkg/local_object_storage/engine/get.go | 4 + pkg/local_object_storage/engine/head.go | 4 + pkg/local_object_storage/engine/inhume.go | 4 + pkg/local_object_storage/engine/metrics.go | 113 +++++++++++++++++++ pkg/local_object_storage/engine/put.go | 4 + pkg/local_object_storage/engine/range.go | 4 + pkg/local_object_storage/engine/select.go | 8 ++ 12 files changed, 169 insertions(+) create mode 100644 pkg/local_object_storage/engine/metrics.go diff --git a/pkg/local_object_storage/engine/container.go b/pkg/local_object_storage/engine/container.go index 11766bac4..30e13422a 100644 --- a/pkg/local_object_storage/engine/container.go +++ b/pkg/local_object_storage/engine/container.go @@ -38,6 +38,10 @@ func (r *ListContainersRes) Containers() []*container.ID { // ContainerSize returns sum of estimation container sizes among all shards. func (e *StorageEngine) ContainerSize(prm *ContainerSizePrm) *ContainerSizeRes { + if e.enableMetrics { + defer elapsed(estimateContainerSizeDuration)() + } + return &ContainerSizeRes{ size: e.containerSize(prm.cid), } @@ -70,6 +74,10 @@ func (e *StorageEngine) containerSize(id *container.ID) (total uint64) { // ListContainers returns unique container IDs presented in the engine objects. func (e *StorageEngine) ListContainers(_ *ListContainersPrm) *ListContainersRes { + if e.enableMetrics { + defer elapsed(listContainersDuration)() + } + return &ListContainersRes{ containers: e.listContainers(), } diff --git a/pkg/local_object_storage/engine/control.go b/pkg/local_object_storage/engine/control.go index 72ef84bd9..fcb376096 100644 --- a/pkg/local_object_storage/engine/control.go +++ b/pkg/local_object_storage/engine/control.go @@ -30,6 +30,10 @@ func (e *StorageEngine) Init() error { } } + if e.enableMetrics { + registerMetrics() + } + return nil } diff --git a/pkg/local_object_storage/engine/delete.go b/pkg/local_object_storage/engine/delete.go index 3755baf2c..5ab841d4a 100644 --- a/pkg/local_object_storage/engine/delete.go +++ b/pkg/local_object_storage/engine/delete.go @@ -27,6 +27,10 @@ func (p *DeletePrm) WithAddresses(addr ...*objectSDK.Address) *DeletePrm { // Delete marks the objects to be removed. func (e *StorageEngine) Delete(prm *DeletePrm) (*DeleteRes, error) { + if e.enableMetrics { + defer elapsed(deleteDuration)() + } + shPrm := new(shard.InhumePrm) existsPrm := new(shard.ExistsPrm) diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index d86010e4a..d0d9b79a6 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -22,6 +22,8 @@ type Option func(*cfg) type cfg struct { log *logger.Logger + + enableMetrics bool } func defaultCfg() *cfg { @@ -51,3 +53,9 @@ func WithLogger(l *logger.Logger) Option { c.log = l } } + +func WithMetrics(v bool) Option { + return func(c *cfg) { + c.enableMetrics = v + } +} diff --git a/pkg/local_object_storage/engine/exists.go b/pkg/local_object_storage/engine/exists.go index 6febe5891..4d14be99b 100644 --- a/pkg/local_object_storage/engine/exists.go +++ b/pkg/local_object_storage/engine/exists.go @@ -38,6 +38,10 @@ func (p *ExistsRes) Exists() bool { // Returns any error encountered that does not allow to // unambiguously determine the presence of an object. func (e *StorageEngine) Exists(prm *ExistsPrm) (*ExistsRes, error) { + if e.enableMetrics { + defer elapsed(existsDuration)() + } + exists, err := e.exists(prm.addr) return &ExistsRes{ diff --git a/pkg/local_object_storage/engine/get.go b/pkg/local_object_storage/engine/get.go index 3b37a4a27..d6ab92cec 100644 --- a/pkg/local_object_storage/engine/get.go +++ b/pkg/local_object_storage/engine/get.go @@ -42,6 +42,10 @@ func (r *GetRes) Object() *object.Object { // // Returns ErrNotFound if requested object is missing in local storage. func (e *StorageEngine) Get(prm *GetPrm) (*GetRes, error) { + if e.enableMetrics { + defer elapsed(getDuration)() + } + var ( obj *object.Object siErr *objectSDK.SplitInfoError diff --git a/pkg/local_object_storage/engine/head.go b/pkg/local_object_storage/engine/head.go index 0b566466b..81e980314 100644 --- a/pkg/local_object_storage/engine/head.go +++ b/pkg/local_object_storage/engine/head.go @@ -56,6 +56,10 @@ func (r *HeadRes) Header() *object.Object { // Returns object.ErrNotFound if requested object is missing in local storage. // Returns object.ErrAlreadyRemoved if requested object was inhumed. func (e *StorageEngine) Head(prm *HeadPrm) (*HeadRes, error) { + if e.enableMetrics { + defer elapsed(headDuration)() + } + var ( head *object.Object siErr *objectSDK.SplitInfoError diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index 487e1a319..a29253a80 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -49,6 +49,10 @@ var errInhumeFailure = errors.New("inhume operation failed") // Inhume calls metabase. Inhume method to mark object as removed. It won't be // removed physically from shard until `Delete` operation. func (e *StorageEngine) Inhume(prm *InhumePrm) (*InhumeRes, error) { + if e.enableMetrics { + defer elapsed(inhumeDuration)() + } + shPrm := new(shard.InhumePrm) for i := range prm.addrs { diff --git a/pkg/local_object_storage/engine/metrics.go b/pkg/local_object_storage/engine/metrics.go new file mode 100644 index 000000000..b51f1a4bd --- /dev/null +++ b/pkg/local_object_storage/engine/metrics.go @@ -0,0 +1,113 @@ +package engine + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +const ( + namespace = "neofs_node" + subsystem = "engine" +) + +var ( + listContainersDuration = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "list_containers_duration", + Help: "Accumulated duration of engine list containers operations", + }) + + estimateContainerSizeDuration = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "estimate_container_size_duration", + Help: "Accumulated duration of engine container size estimate operations", + }) + + deleteDuration = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "delete_duration", + Help: "Accumulated duration of engine delete operations", + }) + + existsDuration = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "exists_duration", + Help: "Accumulated duration of engine exists operations", + }) + + getDuration = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "get_duration", + Help: "Accumulated duration of engine get operations", + }) + + headDuration = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "head_duration", + Help: "Accumulated duration of engine head operations", + }) + + inhumeDuration = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "inhume_duration", + Help: "Accumulated duration of engine inhume operations", + }) + + putDuration = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "put_duration", + Help: "Accumulated duration of engine put operations", + }) + + rangeDuration = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "range_duration", + Help: "Accumulated duration of engine range operations", + }) + + searchDuration = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "search_duration", + Help: "Accumulated duration of engine search operations", + }) + + listObjectsDuration = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "list_objects_duration", + Help: "Accumulated duration of engine list objects operations", + }) +) + +func registerMetrics() { + prometheus.MustRegister(listContainersDuration) + prometheus.MustRegister(estimateContainerSizeDuration) + prometheus.MustRegister(deleteDuration) + prometheus.MustRegister(existsDuration) + prometheus.MustRegister(getDuration) + prometheus.MustRegister(headDuration) + prometheus.MustRegister(inhumeDuration) + prometheus.MustRegister(putDuration) + prometheus.MustRegister(rangeDuration) + prometheus.MustRegister(searchDuration) + prometheus.MustRegister(listObjectsDuration) +} + +func elapsed(c prometheus.Counter) func() { + t := time.Now() + + return func() { + c.Add(float64(time.Since(t))) + } +} diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index 2d5b0926e..0abeb16fd 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -34,6 +34,10 @@ func (p *PutPrm) WithObject(obj *object.Object) *PutPrm { // Returns any error encountered that // did not allow to completely save the object. func (e *StorageEngine) Put(prm *PutPrm) (*PutRes, error) { + if e.enableMetrics { + defer elapsed(putDuration)() + } + _, err := e.exists(prm.obj.Address()) // todo: make this check parallel if err != nil { return nil, err diff --git a/pkg/local_object_storage/engine/range.go b/pkg/local_object_storage/engine/range.go index f981b5e4a..fa01bc57c 100644 --- a/pkg/local_object_storage/engine/range.go +++ b/pkg/local_object_storage/engine/range.go @@ -60,6 +60,10 @@ func (r *RngRes) Object() *object.Object { // Returns ErrAlreadyRemoved if requested object is inhumed. // Returns ErrRangeOutOfBounds if requested object range is out of bounds. func (e *StorageEngine) GetRange(prm *RngPrm) (*RngRes, error) { + if e.enableMetrics { + defer elapsed(rangeDuration)() + } + var ( obj *object.Object siErr *objectSDK.SplitInfoError diff --git a/pkg/local_object_storage/engine/select.go b/pkg/local_object_storage/engine/select.go index 90e86e232..14ce8ffab 100644 --- a/pkg/local_object_storage/engine/select.go +++ b/pkg/local_object_storage/engine/select.go @@ -48,6 +48,10 @@ func (r *SelectRes) AddressList() []*object.Address { // // Returns any error encountered that did not allow to completely select the objects. func (e *StorageEngine) Select(prm *SelectPrm) (*SelectRes, error) { + if e.enableMetrics { + defer elapsed(searchDuration)() + } + addrList := make([]*object.Address, 0) uniqueMap := make(map[string]struct{}) @@ -94,6 +98,10 @@ func (e *StorageEngine) Select(prm *SelectPrm) (*SelectRes, error) { // List returns `limit` available physically storage object addresses in engine. // If limit is zero, then returns all available object addresses. func (e *StorageEngine) List(limit uint64) (*SelectRes, error) { + if e.enableMetrics { + defer elapsed(listObjectsDuration)() + } + addrList := make([]*object.Address, limit) uniqueMap := make(map[string]struct{}) ln := uint64(0)