WIP: shard: Add PUT metrics #232

Closed
dstepanov-yadro wants to merge 1 commits from dstepanov-yadro/frostfs-node:object-3062 into master
9 changed files with 204 additions and 11 deletions

View File

@ -24,6 +24,8 @@ type MetricRegister interface {
AddToContainerSize(cnrID string, size int64)
AddToPayloadCounter(shardID string, size int64)
AddShardOperationDuration(shardID string, operation string, d time.Duration)
}
func elapsed(addFunc func(d time.Duration)) func() {

View File

@ -2,6 +2,7 @@ package engine
import (
"fmt"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
@ -60,6 +61,42 @@ func (m *metricsWithID) AddToPayloadSize(size int64) {
m.mw.AddToPayloadCounter(m.id, size)
}
func (m *metricsWithID) AddExistsDuration(start time.Time) {
m.mw.AddShardOperationDuration(m.id, "shard.exists", time.Since(start))
}
func (m *metricsWithID) AddMoveItDuration(start time.Time) {
m.mw.AddShardOperationDuration(m.id, "shard.moveit", time.Since(start))
}
func (m *metricsWithID) AddPutDuration(start time.Time) {
m.mw.AddShardOperationDuration(m.id, "shard.put", time.Since(start))
}
func (m *metricsWithID) AddExistsBlobstorDuration(start time.Time) {
m.mw.AddShardOperationDuration(m.id, "blobstor.exists", time.Since(start))
}
func (m *metricsWithID) AddExistsMetabaseDuration(start time.Time) {
m.mw.AddShardOperationDuration(m.id, "metabase.exists", time.Since(start))
}
func (m *metricsWithID) AddMoveItMetabaseDuration(start time.Time) {
m.mw.AddShardOperationDuration(m.id, "metabase.moveit", time.Since(start))
}
func (m *metricsWithID) AddPutWriteCacheDuration(start time.Time) {
m.mw.AddShardOperationDuration(m.id, "writecache.put", time.Since(start))
}
func (m *metricsWithID) AddPutBlobstoreDuration(start time.Time) {
m.mw.AddShardOperationDuration(m.id, "blobstore.put", time.Since(start))
}
func (m *metricsWithID) AddPutMetabaseDuration(start time.Time) {
m.mw.AddShardOperationDuration(m.id, "metabase.put", time.Since(start))
}
// AddShard adds a new shard to the storage engine.
//
// Returns any error encountered that did not allow adding a shard.

View File

@ -1,6 +1,8 @@
package shard
import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -34,6 +36,10 @@ func (p ExistsRes) Exists() bool {
// Returns an error of type apistatus.ObjectAlreadyRemoved if object has been marked as removed.
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
func (s *Shard) Exists(prm ExistsPrm) (ExistsRes, error) {
if s.metricsWriter != nil {
defer s.metricsWriter.AddExistsDuration(time.Now())
}
var exists bool
var err error
@ -45,14 +51,14 @@ func (s *Shard) Exists(prm ExistsPrm) (ExistsRes, error) {
p.Address = prm.addr
var res common.ExistsRes
res, err = s.blobStor.Exists(p)
res, err = s.existsBlobstore(p)
exists = res.Exists
} else {
var existsPrm meta.ExistsPrm
existsPrm.SetAddress(prm.addr)
var res meta.ExistsRes
res, err = s.metaBase.Exists(existsPrm)
res, err = s.existsMetabase(existsPrm)
exists = res.Exists()
}
@ -60,3 +66,19 @@ func (s *Shard) Exists(prm ExistsPrm) (ExistsRes, error) {
ex: exists,
}, err
}
func (s *Shard) existsBlobstore(prm common.ExistsPrm) (common.ExistsRes, error) {
if s.metricsWriter != nil {
defer s.metricsWriter.AddExistsBlobstorDuration(time.Now())
}
return s.blobStor.Exists(prm)
}
func (s *Shard) existsMetabase(prm meta.ExistsPrm) (meta.ExistsRes, error) {
if s.metricsWriter != nil {
defer s.metricsWriter.AddExistsMetabaseDuration(time.Now())
}
return s.metaBase.Exists(prm)
}

View File

@ -4,6 +4,7 @@ import (
"context"
"path/filepath"
"testing"
"time"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
@ -19,10 +20,11 @@ import (
)
type metricsStore struct {
objCounters map[string]uint64
cnrSize map[string]int64
pldSize int64
readOnly bool
shardCounters map[string]float64
objCounters map[string]uint64
cnrSize map[string]int64
pldSize int64
readOnly bool
}
func (m metricsStore) SetShardID(_ string) {}
@ -68,6 +70,42 @@ func (m *metricsStore) AddToPayloadSize(size int64) {
m.pldSize += size
}
func (m *metricsStore) AddExistsDuration(start time.Time) {
m.shardCounters["shard.exists"] += time.Since(start).Seconds()
}
func (m *metricsStore) AddMoveItDuration(start time.Time) {
m.shardCounters["shard.moveit"] += time.Since(start).Seconds()
}
func (m *metricsStore) AddPutDuration(start time.Time) {
m.shardCounters["shard.put"] += time.Since(start).Seconds()
}
func (m *metricsStore) AddExistsBlobstorDuration(start time.Time) {
m.shardCounters["blobstor.exists"] += time.Since(start).Seconds()
}
func (m *metricsStore) AddExistsMetabaseDuration(start time.Time) {
m.shardCounters["metabase.exists"] += time.Since(start).Seconds()
}
func (m *metricsStore) AddMoveItMetabaseDuration(start time.Time) {
m.shardCounters["metabase.moveit"] += time.Since(start).Seconds()
}
func (m *metricsStore) AddPutWriteCacheDuration(start time.Time) {
m.shardCounters["writecache.put"] += time.Since(start).Seconds()
}
func (m *metricsStore) AddPutBlobstoreDuration(start time.Time) {
m.shardCounters["blobstor.put"] += time.Since(start).Seconds()
}
func (m *metricsStore) AddPutMetabaseDuration(start time.Time) {
m.shardCounters["metabase.put"] += time.Since(start).Seconds()
}
const physical = "phy"
const logical = "logic"
@ -89,6 +127,7 @@ func TestCounters(t *testing.T) {
t.Run("defaults", func(t *testing.T) {
require.Zero(t, mm.objCounters[physical])
require.Zero(t, mm.objCounters[logical])
require.Empty(t, mm.shardCounters)
require.Empty(t, mm.cnrSize)
require.Zero(t, mm.pldSize)
})
@ -117,6 +156,7 @@ func TestCounters(t *testing.T) {
require.Equal(t, uint64(objNumber), mm.objCounters[logical])
require.Equal(t, expectedSizes, mm.cnrSize)
require.Equal(t, totalPayload, mm.pldSize)
require.Contains(t, mm.shardCounters, "shard.put")
})
t.Run("inhume_GC", func(t *testing.T) {
@ -203,7 +243,8 @@ func shardWithMetrics(t *testing.T, path string) (*shard.Shard, *metricsStore) {
"phy": 0,
"logic": 0,
},
cnrSize: make(map[string]int64),
cnrSize: make(map[string]int64),
shardCounters: make(map[string]float64),
}
sh := shard.New(

View File

@ -1,6 +1,8 @@
package shard
import (
"time"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
@ -23,6 +25,10 @@ func (p *ToMoveItPrm) SetAddress(addr oid.Address) {
// ToMoveIt calls metabase.ToMoveIt method to mark object as relocatable to
// another shard.
func (s *Shard) ToMoveIt(prm ToMoveItPrm) (ToMoveItRes, error) {
if s.metricsWriter != nil {
defer s.metricsWriter.AddMoveItDuration(time.Now())
}
s.m.RLock()
defer s.m.RUnlock()
@ -36,7 +42,7 @@ func (s *Shard) ToMoveIt(prm ToMoveItPrm) (ToMoveItRes, error) {
var toMovePrm meta.ToMoveItPrm
toMovePrm.SetAddress(prm.addr)
_, err := s.metaBase.ToMoveIt(toMovePrm)
_, err := s.toMoveItMetabase(toMovePrm)
if err != nil {
s.log.Debug("could not mark object for shard relocation in metabase",
zap.String("error", err.Error()),
@ -45,3 +51,11 @@ func (s *Shard) ToMoveIt(prm ToMoveItPrm) (ToMoveItRes, error) {
return ToMoveItRes{}, nil
}
func (s *Shard) toMoveItMetabase(prm meta.ToMoveItPrm) (meta.ToMoveItRes, error) {
if s.metricsWriter != nil {
defer s.metricsWriter.AddMoveItMetabaseDuration(time.Now())
}
return s.metaBase.ToMoveIt(prm)
}

View File

@ -2,6 +2,7 @@ package shard
import (
"fmt"
"time"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
@ -30,6 +31,10 @@ func (p *PutPrm) SetObject(obj *object.Object) {
//
// Returns ErrReadOnlyMode error if shard is in "read-only" mode.
func (s *Shard) Put(prm PutPrm) (PutRes, error) {
if s.metricsWriter != nil {
defer s.metricsWriter.AddPutDuration(time.Now())
}
s.m.RLock()
defer s.m.RUnlock()
@ -54,7 +59,7 @@ func (s *Shard) Put(prm PutPrm) (PutRes, error) {
// ahead of `Put` by storage engine
tryCache := s.hasWriteCache() && !m.NoMetabase()
if tryCache {
res, err = s.writeCache.Put(putPrm)
res, err = s.putToWriteCache(putPrm)
}
if err != nil || !tryCache {
if err != nil {
@ -62,7 +67,7 @@ func (s *Shard) Put(prm PutPrm) (PutRes, error) {
zap.String("err", err.Error()))
}
res, err = s.blobStor.Put(putPrm)
res, err = s.putToBlobstore(putPrm)
if err != nil {
return PutRes{}, fmt.Errorf("could not put object to BLOB storage: %w", err)
}
@ -72,7 +77,7 @@ func (s *Shard) Put(prm PutPrm) (PutRes, error) {
var pPrm meta.PutPrm
pPrm.SetObject(prm.obj)
pPrm.SetStorageID(res.StorageID)
if _, err := s.metaBase.Put(pPrm); err != nil {
if _, err := s.putToMetabase(pPrm); err != nil {
// may we need to handle this case in a special way
// since the object has been successfully written to BlobStor
return PutRes{}, fmt.Errorf("could not put object to metabase: %w", err)
@ -85,3 +90,27 @@ func (s *Shard) Put(prm PutPrm) (PutRes, error) {
return PutRes{}, nil
}
func (s *Shard) putToWriteCache(prm common.PutPrm) (common.PutRes, error) {
if s.metricsWriter != nil {
defer s.metricsWriter.AddPutWriteCacheDuration(time.Now())
}
return s.writeCache.Put(prm)
}
func (s *Shard) putToBlobstore(prm common.PutPrm) (common.PutRes, error) {
if s.metricsWriter != nil {
defer s.metricsWriter.AddPutBlobstoreDuration(time.Now())
}
return s.blobStor.Put(prm)
}
func (s *Shard) putToMetabase(prm meta.PutPrm) (meta.PutRes, error) {
if s.metricsWriter != nil {
defer s.metricsWriter.AddPutMetabaseDuration(time.Now())
}
return s.metaBase.Put(prm)
}

View File

@ -70,6 +70,19 @@ type MetricsWriter interface {
SetShardID(id string)
// SetReadonly must set shard readonly state.
SetReadonly(readonly bool)
AddExistsDuration(start time.Time)
AddMoveItDuration(start time.Time)
AddPutDuration(start time.Time)
AddExistsBlobstorDuration(start time.Time)
AddExistsMetabaseDuration(start time.Time)
AddMoveItMetabaseDuration(start time.Time)
AddPutWriteCacheDuration(start time.Time)
AddPutBlobstoreDuration(start time.Time)
AddPutMetabaseDuration(start time.Time)
}
type cfg struct {

View File

@ -8,6 +8,7 @@ type NodeMetrics struct {
objectServiceMetrics
engineMetrics
stateMetrics
shardMetrics
epoch prometheus.Gauge
}
@ -21,6 +22,9 @@ func NewNodeMetrics() *NodeMetrics {
state := newStateMetrics()
state.register()
shard := newShardMetrics()
shard.register()
epoch := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: innerRingSubsystem,
@ -33,6 +37,7 @@ func NewNodeMetrics() *NodeMetrics {
objectServiceMetrics: objectService,
engineMetrics: engine,
stateMetrics: state,
shardMetrics: shard,
epoch: epoch,
}
}

View File

@ -0,0 +1,30 @@
package metrics
import (
"time"
"github.com/prometheus/client_golang/prometheus"
)
type shardMetrics struct {
shardOperationDuration *prometheus.HistogramVec
}
func newShardMetrics() shardMetrics {
return shardMetrics{
shardOperationDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: "shard",
Name: "operation_duration_seconds",
Help: "Accumulated shard operations duration",
}, []string{"id", "operation"}),
}
}
func (s *shardMetrics) register() {
prometheus.MustRegister(s.shardOperationDuration)
}
func (s *shardMetrics) AddShardOperationDuration(shardID string, operation string, d time.Duration) {
s.shardOperationDuration.WithLabelValues(shardID, operation).Observe(d.Seconds())
}