forked from TrueCloudLab/frostfs-node
[#232] shard: Add PUT metrics
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
02831d427b
commit
4fcc81c999
9 changed files with 204 additions and 11 deletions
|
@ -24,6 +24,8 @@ type MetricRegister interface {
|
|||
|
||||
AddToContainerSize(cnrID string, size int64)
|
||||
AddToPayloadCounter(shardID string, size int64)
|
||||
|
||||
AddShardOperationDuration(shardID string, operation string, d time.Duration)
|
||||
}
|
||||
|
||||
func elapsed(addFunc func(d time.Duration)) func() {
|
||||
|
|
|
@ -2,6 +2,7 @@ package engine
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
|
@ -60,6 +61,42 @@ func (m *metricsWithID) AddToPayloadSize(size int64) {
|
|||
m.mw.AddToPayloadCounter(m.id, size)
|
||||
}
|
||||
|
||||
func (m *metricsWithID) AddExistsDuration(start time.Time) {
|
||||
m.mw.AddShardOperationDuration(m.id, "shard.exists", time.Since(start))
|
||||
}
|
||||
|
||||
func (m *metricsWithID) AddMoveItDuration(start time.Time) {
|
||||
m.mw.AddShardOperationDuration(m.id, "shard.moveit", time.Since(start))
|
||||
}
|
||||
|
||||
func (m *metricsWithID) AddPutDuration(start time.Time) {
|
||||
m.mw.AddShardOperationDuration(m.id, "shard.put", time.Since(start))
|
||||
}
|
||||
|
||||
func (m *metricsWithID) AddExistsBlobstorDuration(start time.Time) {
|
||||
m.mw.AddShardOperationDuration(m.id, "blobstor.exists", time.Since(start))
|
||||
}
|
||||
|
||||
func (m *metricsWithID) AddExistsMetabaseDuration(start time.Time) {
|
||||
m.mw.AddShardOperationDuration(m.id, "metabase.exists", time.Since(start))
|
||||
}
|
||||
|
||||
func (m *metricsWithID) AddMoveItMetabaseDuration(start time.Time) {
|
||||
m.mw.AddShardOperationDuration(m.id, "metabase.moveit", time.Since(start))
|
||||
}
|
||||
|
||||
func (m *metricsWithID) AddPutWriteCacheDuration(start time.Time) {
|
||||
m.mw.AddShardOperationDuration(m.id, "writecache.put", time.Since(start))
|
||||
}
|
||||
|
||||
func (m *metricsWithID) AddPutBlobstoreDuration(start time.Time) {
|
||||
m.mw.AddShardOperationDuration(m.id, "blobstore.put", time.Since(start))
|
||||
}
|
||||
|
||||
func (m *metricsWithID) AddPutMetabaseDuration(start time.Time) {
|
||||
m.mw.AddShardOperationDuration(m.id, "metabase.put", time.Since(start))
|
||||
}
|
||||
|
||||
// AddShard adds a new shard to the storage engine.
|
||||
//
|
||||
// Returns any error encountered that did not allow adding a shard.
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package shard
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
|
@ -34,6 +36,10 @@ func (p ExistsRes) Exists() bool {
|
|||
// Returns an error of type apistatus.ObjectAlreadyRemoved if object has been marked as removed.
|
||||
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
|
||||
func (s *Shard) Exists(prm ExistsPrm) (ExistsRes, error) {
|
||||
if s.metricsWriter != nil {
|
||||
defer s.metricsWriter.AddExistsDuration(time.Now())
|
||||
}
|
||||
|
||||
var exists bool
|
||||
var err error
|
||||
|
||||
|
@ -45,14 +51,14 @@ func (s *Shard) Exists(prm ExistsPrm) (ExistsRes, error) {
|
|||
p.Address = prm.addr
|
||||
|
||||
var res common.ExistsRes
|
||||
res, err = s.blobStor.Exists(p)
|
||||
res, err = s.existsBlobstore(p)
|
||||
exists = res.Exists
|
||||
} else {
|
||||
var existsPrm meta.ExistsPrm
|
||||
existsPrm.SetAddress(prm.addr)
|
||||
|
||||
var res meta.ExistsRes
|
||||
res, err = s.metaBase.Exists(existsPrm)
|
||||
res, err = s.existsMetabase(existsPrm)
|
||||
exists = res.Exists()
|
||||
}
|
||||
|
||||
|
@ -60,3 +66,19 @@ func (s *Shard) Exists(prm ExistsPrm) (ExistsRes, error) {
|
|||
ex: exists,
|
||||
}, err
|
||||
}
|
||||
|
||||
func (s *Shard) existsBlobstore(prm common.ExistsPrm) (common.ExistsRes, error) {
|
||||
if s.metricsWriter != nil {
|
||||
defer s.metricsWriter.AddExistsBlobstorDuration(time.Now())
|
||||
}
|
||||
|
||||
return s.blobStor.Exists(prm)
|
||||
}
|
||||
|
||||
func (s *Shard) existsMetabase(prm meta.ExistsPrm) (meta.ExistsRes, error) {
|
||||
if s.metricsWriter != nil {
|
||||
defer s.metricsWriter.AddExistsMetabaseDuration(time.Now())
|
||||
}
|
||||
|
||||
return s.metaBase.Exists(prm)
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||
|
@ -19,10 +20,11 @@ import (
|
|||
)
|
||||
|
||||
type metricsStore struct {
|
||||
objCounters map[string]uint64
|
||||
cnrSize map[string]int64
|
||||
pldSize int64
|
||||
readOnly bool
|
||||
shardCounters map[string]float64
|
||||
objCounters map[string]uint64
|
||||
cnrSize map[string]int64
|
||||
pldSize int64
|
||||
readOnly bool
|
||||
}
|
||||
|
||||
func (m metricsStore) SetShardID(_ string) {}
|
||||
|
@ -68,6 +70,42 @@ func (m *metricsStore) AddToPayloadSize(size int64) {
|
|||
m.pldSize += size
|
||||
}
|
||||
|
||||
func (m *metricsStore) AddExistsDuration(start time.Time) {
|
||||
m.shardCounters["shard.exists"] += time.Since(start).Seconds()
|
||||
}
|
||||
|
||||
func (m *metricsStore) AddMoveItDuration(start time.Time) {
|
||||
m.shardCounters["shard.moveit"] += time.Since(start).Seconds()
|
||||
}
|
||||
|
||||
func (m *metricsStore) AddPutDuration(start time.Time) {
|
||||
m.shardCounters["shard.put"] += time.Since(start).Seconds()
|
||||
}
|
||||
|
||||
func (m *metricsStore) AddExistsBlobstorDuration(start time.Time) {
|
||||
m.shardCounters["blobstor.exists"] += time.Since(start).Seconds()
|
||||
}
|
||||
|
||||
func (m *metricsStore) AddExistsMetabaseDuration(start time.Time) {
|
||||
m.shardCounters["metabase.exists"] += time.Since(start).Seconds()
|
||||
}
|
||||
|
||||
func (m *metricsStore) AddMoveItMetabaseDuration(start time.Time) {
|
||||
m.shardCounters["metabase.moveit"] += time.Since(start).Seconds()
|
||||
}
|
||||
|
||||
func (m *metricsStore) AddPutWriteCacheDuration(start time.Time) {
|
||||
m.shardCounters["writecache.put"] += time.Since(start).Seconds()
|
||||
}
|
||||
|
||||
func (m *metricsStore) AddPutBlobstoreDuration(start time.Time) {
|
||||
m.shardCounters["blobstor.put"] += time.Since(start).Seconds()
|
||||
}
|
||||
|
||||
func (m *metricsStore) AddPutMetabaseDuration(start time.Time) {
|
||||
m.shardCounters["metabase.put"] += time.Since(start).Seconds()
|
||||
}
|
||||
|
||||
const physical = "phy"
|
||||
const logical = "logic"
|
||||
|
||||
|
@ -89,6 +127,7 @@ func TestCounters(t *testing.T) {
|
|||
t.Run("defaults", func(t *testing.T) {
|
||||
require.Zero(t, mm.objCounters[physical])
|
||||
require.Zero(t, mm.objCounters[logical])
|
||||
require.Empty(t, mm.shardCounters)
|
||||
require.Empty(t, mm.cnrSize)
|
||||
require.Zero(t, mm.pldSize)
|
||||
})
|
||||
|
@ -117,6 +156,7 @@ func TestCounters(t *testing.T) {
|
|||
require.Equal(t, uint64(objNumber), mm.objCounters[logical])
|
||||
require.Equal(t, expectedSizes, mm.cnrSize)
|
||||
require.Equal(t, totalPayload, mm.pldSize)
|
||||
require.Contains(t, mm.shardCounters, "shard.put")
|
||||
})
|
||||
|
||||
t.Run("inhume_GC", func(t *testing.T) {
|
||||
|
@ -203,7 +243,8 @@ func shardWithMetrics(t *testing.T, path string) (*shard.Shard, *metricsStore) {
|
|||
"phy": 0,
|
||||
"logic": 0,
|
||||
},
|
||||
cnrSize: make(map[string]int64),
|
||||
cnrSize: make(map[string]int64),
|
||||
shardCounters: make(map[string]float64),
|
||||
}
|
||||
|
||||
sh := shard.New(
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package shard
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
|
@ -23,6 +25,10 @@ func (p *ToMoveItPrm) SetAddress(addr oid.Address) {
|
|||
// ToMoveIt calls metabase.ToMoveIt method to mark object as relocatable to
|
||||
// another shard.
|
||||
func (s *Shard) ToMoveIt(prm ToMoveItPrm) (ToMoveItRes, error) {
|
||||
if s.metricsWriter != nil {
|
||||
defer s.metricsWriter.AddMoveItDuration(time.Now())
|
||||
}
|
||||
|
||||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
||||
|
@ -36,7 +42,7 @@ func (s *Shard) ToMoveIt(prm ToMoveItPrm) (ToMoveItRes, error) {
|
|||
var toMovePrm meta.ToMoveItPrm
|
||||
toMovePrm.SetAddress(prm.addr)
|
||||
|
||||
_, err := s.metaBase.ToMoveIt(toMovePrm)
|
||||
_, err := s.toMoveItMetabase(toMovePrm)
|
||||
if err != nil {
|
||||
s.log.Debug("could not mark object for shard relocation in metabase",
|
||||
zap.String("error", err.Error()),
|
||||
|
@ -45,3 +51,11 @@ func (s *Shard) ToMoveIt(prm ToMoveItPrm) (ToMoveItRes, error) {
|
|||
|
||||
return ToMoveItRes{}, nil
|
||||
}
|
||||
|
||||
func (s *Shard) toMoveItMetabase(prm meta.ToMoveItPrm) (meta.ToMoveItRes, error) {
|
||||
if s.metricsWriter != nil {
|
||||
defer s.metricsWriter.AddMoveItMetabaseDuration(time.Now())
|
||||
}
|
||||
|
||||
return s.metaBase.ToMoveIt(prm)
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package shard
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||
|
@ -30,6 +31,10 @@ func (p *PutPrm) SetObject(obj *object.Object) {
|
|||
//
|
||||
// Returns ErrReadOnlyMode error if shard is in "read-only" mode.
|
||||
func (s *Shard) Put(prm PutPrm) (PutRes, error) {
|
||||
if s.metricsWriter != nil {
|
||||
defer s.metricsWriter.AddPutDuration(time.Now())
|
||||
}
|
||||
|
||||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
||||
|
@ -54,7 +59,7 @@ func (s *Shard) Put(prm PutPrm) (PutRes, error) {
|
|||
// ahead of `Put` by storage engine
|
||||
tryCache := s.hasWriteCache() && !m.NoMetabase()
|
||||
if tryCache {
|
||||
res, err = s.writeCache.Put(putPrm)
|
||||
res, err = s.putToWriteCache(putPrm)
|
||||
}
|
||||
if err != nil || !tryCache {
|
||||
if err != nil {
|
||||
|
@ -62,7 +67,7 @@ func (s *Shard) Put(prm PutPrm) (PutRes, error) {
|
|||
zap.String("err", err.Error()))
|
||||
}
|
||||
|
||||
res, err = s.blobStor.Put(putPrm)
|
||||
res, err = s.putToBlobstore(putPrm)
|
||||
if err != nil {
|
||||
return PutRes{}, fmt.Errorf("could not put object to BLOB storage: %w", err)
|
||||
}
|
||||
|
@ -72,7 +77,7 @@ func (s *Shard) Put(prm PutPrm) (PutRes, error) {
|
|||
var pPrm meta.PutPrm
|
||||
pPrm.SetObject(prm.obj)
|
||||
pPrm.SetStorageID(res.StorageID)
|
||||
if _, err := s.metaBase.Put(pPrm); err != nil {
|
||||
if _, err := s.putToMetabase(pPrm); err != nil {
|
||||
// may we need to handle this case in a special way
|
||||
// since the object has been successfully written to BlobStor
|
||||
return PutRes{}, fmt.Errorf("could not put object to metabase: %w", err)
|
||||
|
@ -85,3 +90,27 @@ func (s *Shard) Put(prm PutPrm) (PutRes, error) {
|
|||
|
||||
return PutRes{}, nil
|
||||
}
|
||||
|
||||
func (s *Shard) putToWriteCache(prm common.PutPrm) (common.PutRes, error) {
|
||||
if s.metricsWriter != nil {
|
||||
defer s.metricsWriter.AddPutWriteCacheDuration(time.Now())
|
||||
}
|
||||
|
||||
return s.writeCache.Put(prm)
|
||||
}
|
||||
|
||||
func (s *Shard) putToBlobstore(prm common.PutPrm) (common.PutRes, error) {
|
||||
if s.metricsWriter != nil {
|
||||
defer s.metricsWriter.AddPutBlobstoreDuration(time.Now())
|
||||
}
|
||||
|
||||
return s.blobStor.Put(prm)
|
||||
}
|
||||
|
||||
func (s *Shard) putToMetabase(prm meta.PutPrm) (meta.PutRes, error) {
|
||||
if s.metricsWriter != nil {
|
||||
defer s.metricsWriter.AddPutMetabaseDuration(time.Now())
|
||||
}
|
||||
|
||||
return s.metaBase.Put(prm)
|
||||
}
|
||||
|
|
|
@ -70,6 +70,19 @@ type MetricsWriter interface {
|
|||
SetShardID(id string)
|
||||
// SetReadonly must set shard readonly state.
|
||||
SetReadonly(readonly bool)
|
||||
|
||||
AddExistsDuration(start time.Time)
|
||||
AddMoveItDuration(start time.Time)
|
||||
AddPutDuration(start time.Time)
|
||||
|
||||
AddExistsBlobstorDuration(start time.Time)
|
||||
AddExistsMetabaseDuration(start time.Time)
|
||||
|
||||
AddMoveItMetabaseDuration(start time.Time)
|
||||
|
||||
AddPutWriteCacheDuration(start time.Time)
|
||||
AddPutBlobstoreDuration(start time.Time)
|
||||
AddPutMetabaseDuration(start time.Time)
|
||||
}
|
||||
|
||||
type cfg struct {
|
||||
|
|
|
@ -8,6 +8,7 @@ type NodeMetrics struct {
|
|||
objectServiceMetrics
|
||||
engineMetrics
|
||||
stateMetrics
|
||||
shardMetrics
|
||||
epoch prometheus.Gauge
|
||||
}
|
||||
|
||||
|
@ -21,6 +22,9 @@ func NewNodeMetrics() *NodeMetrics {
|
|||
state := newStateMetrics()
|
||||
state.register()
|
||||
|
||||
shard := newShardMetrics()
|
||||
shard.register()
|
||||
|
||||
epoch := prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: innerRingSubsystem,
|
||||
|
@ -33,6 +37,7 @@ func NewNodeMetrics() *NodeMetrics {
|
|||
objectServiceMetrics: objectService,
|
||||
engineMetrics: engine,
|
||||
stateMetrics: state,
|
||||
shardMetrics: shard,
|
||||
epoch: epoch,
|
||||
}
|
||||
}
|
||||
|
|
30
pkg/metrics/shard.go
Normal file
30
pkg/metrics/shard.go
Normal file
|
@ -0,0 +1,30 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
type shardMetrics struct {
|
||||
shardOperationDuration *prometheus.HistogramVec
|
||||
}
|
||||
|
||||
func newShardMetrics() shardMetrics {
|
||||
return shardMetrics{
|
||||
shardOperationDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: "shard",
|
||||
Name: "operation_duration_seconds",
|
||||
Help: "Accumulated shard operations duration",
|
||||
}, []string{"id", "operation"}),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *shardMetrics) register() {
|
||||
prometheus.MustRegister(s.shardOperationDuration)
|
||||
}
|
||||
|
||||
func (s *shardMetrics) AddShardOperationDuration(shardID string, operation string, d time.Duration) {
|
||||
s.shardOperationDuration.WithLabelValues(shardID, operation).Observe(d.Seconds())
|
||||
}
|
Loading…
Reference in a new issue