Write cache metrcis #378
|
@ -2,6 +2,8 @@ package engine
|
|||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics"
|
||||
)
|
||||
|
||||
type MetricRegister interface {
|
||||
|
@ -24,6 +26,8 @@ type MetricRegister interface {
|
|||
|
||||
AddToContainerSize(cnrID string, size int64)
|
||||
AddToPayloadCounter(shardID string, size int64)
|
||||
|
||||
WriteCache() metrics.WriteCacheMetrics
|
||||
}
|
||||
|
||||
fyrchik marked this conversation as resolved
Outdated
|
||||
func elapsed(addFunc func(d time.Duration)) func() {
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/hrw"
|
||||
"github.com/google/uuid"
|
||||
|
@ -98,6 +99,12 @@ func (e *StorageEngine) createShard(opts []shard.Option) (*shard.Shard, error) {
|
|||
mw: e.metrics,
|
||||
},
|
||||
))
|
||||
opts = append(opts, shard.WithExtraWriteCacheOptions(writecache.WithMetrics(
|
||||
&writeCacheMetrics{
|
||||
shardID: id.String(),
|
||||
metrics: e.metrics.WriteCache(),
|
||||
},
|
||||
)))
|
||||
}
|
||||
|
||||
e.mtx.RUnlock()
|
||||
|
|
|
@ -2,9 +2,13 @@ package engine
|
|||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
@ -52,3 +56,52 @@ func (e *StorageEngine) FlushWriteCache(ctx context.Context, p FlushWriteCachePr
|
|||
|
||||
return FlushWriteCacheRes{}, sh.FlushWriteCache(ctx, prm)
|
||||
}
|
||||
|
||||
type writeCacheMetrics struct {
|
||||
shardID string
|
||||
metrics metrics.WriteCacheMetrics
|
||||
}
|
||||
|
||||
func (m *writeCacheMetrics) Get(d time.Duration, success bool, st writecache.StorageType) {
|
||||
m.metrics.AddGetDuration(m.shardID, success, d)
|
||||
m.metrics.IncGetCounter(m.shardID, success, st.String())
|
||||
}
|
||||
|
||||
func (m *writeCacheMetrics) Delete(d time.Duration, success bool, st writecache.StorageType) {
|
||||
m.metrics.AddDeleteDuration(m.shardID, success, d)
|
||||
m.metrics.IncDeleteCounter(m.shardID, success, st.String())
|
||||
if success {
|
||||
m.metrics.DecActualCount(m.shardID, st.String())
|
||||
}
|
||||
}
|
||||
|
||||
func (m *writeCacheMetrics) Put(d time.Duration, success bool, st writecache.StorageType) {
|
||||
m.metrics.AddPutDuration(m.shardID, success, d)
|
||||
m.metrics.IncPutCounter(m.shardID, success, st.String())
|
||||
if success {
|
||||
m.metrics.IncActualCount(m.shardID, st.String())
|
||||
}
|
||||
}
|
||||
|
||||
func (m *writeCacheMetrics) SetEstimateSize(db, fstree uint64) {
|
||||
m.metrics.SetEstimateSize(m.shardID, db, writecache.StorageTypeDB.String())
|
||||
m.metrics.SetEstimateSize(m.shardID, fstree, writecache.StorageTypeFSTree.String())
|
||||
}
|
||||
|
||||
func (m *writeCacheMetrics) SetMode(mode mode.Mode) {
|
||||
m.metrics.SetMode(m.shardID, mode.String())
|
||||
}
|
||||
|
||||
func (m *writeCacheMetrics) SetActualCounters(db, fstree uint64) {
|
||||
m.metrics.SetActualCount(m.shardID, db, writecache.StorageTypeDB.String())
|
||||
m.metrics.SetActualCount(m.shardID, fstree, writecache.StorageTypeFSTree.String())
|
||||
}
|
||||
|
||||
func (m *writeCacheMetrics) Flush(success bool, st writecache.StorageType) {
|
||||
m.metrics.IncFlushCounter(m.shardID, success, st.String())
|
||||
}
|
||||
|
||||
func (m *writeCacheMetrics) Evict(st writecache.StorageType) {
|
||||
m.metrics.DecActualCount(m.shardID, st.String())
|
||||
m.metrics.IncEvictCounter(m.shardID, st.String())
|
||||
}
|
||||
|
|
|
@ -187,6 +187,13 @@ func WithWriteCacheOptions(opts ...writecache.Option) Option {
|
|||
}
|
||||
}
|
||||
|
||||
// WithExtraWriteCacheOptions returns option to add extra write cache options.
|
||||
func WithExtraWriteCacheOptions(opts ...writecache.Option) Option {
|
||||
return func(c *cfg) {
|
||||
c.writeCacheOpts = append(c.writeCacheOpts, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
// WithPiloramaOptions returns option to set internal write cache options.
|
||||
func WithPiloramaOptions(opts ...pilorama.Option) Option {
|
||||
return func(c *cfg) {
|
||||
|
|
|
@ -24,7 +24,7 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
|||
defer span.End()
|
||||
|
||||
deleted := false
|
||||
storageType := storageTypeUndefined
|
||||
storageType := StorageTypeUndefined
|
||||
startedAt := time.Now()
|
||||
defer func() {
|
||||
c.metrics.Delete(time.Since(startedAt), deleted, storageType)
|
||||
|
@ -46,7 +46,7 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
|||
})
|
||||
|
||||
if dataSize > 0 {
|
||||
storageType = storageTypeDB
|
||||
storageType = StorageTypeDB
|
||||
err := c.db.Update(func(tx *bbolt.Tx) error {
|
||||
b := tx.Bucket(defaultBucket)
|
||||
err := b.Delete([]byte(saddr))
|
||||
|
@ -65,7 +65,7 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
storageType = storageTypeFSTree
|
||||
storageType = StorageTypeFSTree
|
||||
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
|
||||
if err == nil {
|
||||
storagelog.Write(c.log,
|
||||
|
|
|
@ -199,7 +199,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
|
|||
return err
|
||||
}
|
||||
|
||||
err = c.flushObject(ctx, &obj, data, storageTypeFSTree)
|
||||
err = c.flushObject(ctx, &obj, data, StorageTypeFSTree)
|
||||
if err != nil {
|
||||
if ignoreErrors {
|
||||
return nil
|
||||
|
@ -228,7 +228,7 @@ func (c *cache) workerFlushSmall() {
|
|||
return
|
||||
}
|
||||
|
||||
err := c.flushObject(context.TODO(), obj, nil, storageTypeDB)
|
||||
err := c.flushObject(context.TODO(), obj, nil, StorageTypeDB)
|
||||
if err != nil {
|
||||
// Error is handled in flushObject.
|
||||
continue
|
||||
|
@ -239,7 +239,7 @@ func (c *cache) workerFlushSmall() {
|
|||
}
|
||||
|
||||
// flushObject is used to write object directly to the main storage.
|
||||
func (c *cache) flushObject(ctx context.Context, obj *object.Object, data []byte, st storageType) error {
|
||||
func (c *cache) flushObject(ctx context.Context, obj *object.Object, data []byte, st StorageType) error {
|
||||
var err error
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
What about declaring What about declaring `err` here instead?
dstepanov-yadro
commented
done done
|
||||
|
||||
defer func() {
|
||||
|
@ -319,7 +319,7 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if err := c.flushObject(ctx, &obj, data, storageTypeDB); err != nil {
|
||||
if err := c.flushObject(ctx, &obj, data, StorageTypeDB); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,7 +33,7 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
|
|||
|
||||
func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address) (*objectSDK.Object, error) {
|
||||
found := false
|
||||
storageType := storageTypeUndefined
|
||||
storageType := StorageTypeUndefined
|
||||
startedAt := time.Now()
|
||||
defer func() {
|
||||
c.metrics.Get(time.Since(startedAt), found, storageType)
|
||||
|
@ -43,7 +43,7 @@ func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address)
|
|||
if err == nil {
|
||||
obj := objectSDK.New()
|
||||
found = true
|
||||
storageType = storageTypeDB
|
||||
storageType = StorageTypeDB
|
||||
return obj, obj.Unmarshal(value)
|
||||
}
|
||||
|
||||
|
@ -53,7 +53,7 @@ func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address)
|
|||
}
|
||||
|
||||
found = true
|
||||
storageType = storageTypeFSTree
|
||||
storageType = StorageTypeFSTree
|
||||
return res.Object, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -6,37 +6,44 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||
)
|
||||
|
||||
type storageType string
|
||||
type StorageType string
|
||||
|
||||
func (t StorageType) String() string {
|
||||
return string(t)
|
||||
}
|
||||
|
||||
const (
|
||||
storageTypeUndefined storageType = "null"
|
||||
storageTypeDB storageType = "db"
|
||||
storageTypeFSTree storageType = "fstree"
|
||||
StorageTypeUndefined StorageType = "null"
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Does it have any specific meaning for the person who views metrics? Does it have any specific meaning for the person who views metrics?
dstepanov-yadro
commented
This value is used when success = false, so replaced with This value is used when success = false, so replaced with `null` value
fyrchik
commented
Hm, so we can have failed counter increasing both for "any fail" and "fail specific to some storage type"? Hm, so we can have failed counter increasing both for "any fail" and "fail specific to some storage type"?
fyrchik
commented
I mean may be it's better to have 1 total for everything and specific counters which could increase together with the total one? I mean may be it's better to have 1 total for everything and specific counters which _could_ increase together with the total one?
dstepanov-yadro
commented
Why it is better? Why it is better?
Now this case can be resolved with selector {success=false}.
|
||||
StorageTypeDB StorageType = "db"
|
||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Why are they exported? Why are they exported?
dstepanov-yadro
commented
they are used in they are used in `engine` pkg
|
||||
StorageTypeFSTree StorageType = "fstree"
|
||||
)
|
||||
|
||||
type Metrics interface {
|
||||
Get(d time.Duration, success bool, st storageType)
|
||||
Delete(d time.Duration, success bool, st storageType)
|
||||
Put(d time.Duration, success bool, st storageType)
|
||||
Flush(success bool, st storageType)
|
||||
Evict(st storageType)
|
||||
Get(d time.Duration, success bool, st StorageType)
|
||||
Delete(d time.Duration, success bool, st StorageType)
|
||||
Put(d time.Duration, success bool, st StorageType)
|
||||
Flush(success bool, st StorageType)
|
||||
Evict(st StorageType)
|
||||
|
||||
Estimate(db, fstree uint64)
|
||||
SetEstimateSize(db, fstree uint64)
|
||||
SetMode(m mode.Mode)
|
||||
SetActualCounters(db, fstree uint64)
|
||||
}
|
||||
|
||||
type metricsStub struct{}
|
||||
|
||||
func (s *metricsStub) Get(time.Duration, bool, storageType) {}
|
||||
func (s *metricsStub) Get(time.Duration, bool, StorageType) {}
|
||||
ale64bit marked this conversation as resolved
Outdated
ale64bit
commented
you can just delete all placeholders, since none is used, e.g.
Same elsewhere. you can just delete all placeholders, since none is used, e.g.
```
func (s *metricsStub) Get(time.Duration, bool, StorageType) {}
```
Same elsewhere.
dstepanov-yadro
commented
fixed fixed
|
||||
|
||||
func (s *metricsStub) Delete(time.Duration, bool, storageType) {}
|
||||
func (s *metricsStub) Delete(time.Duration, bool, StorageType) {}
|
||||
|
||||
func (s *metricsStub) Put(time.Duration, bool, storageType) {}
|
||||
func (s *metricsStub) Put(time.Duration, bool, StorageType) {}
|
||||
|
||||
func (s *metricsStub) Estimate(uint64, uint64) {}
|
||||
func (s *metricsStub) SetEstimateSize(uint64, uint64) {}
|
||||
|
||||
func (s *metricsStub) SetMode(mode.Mode) {}
|
||||
|
||||
func (s *metricsStub) Flush(bool, storageType) {}
|
||||
func (s *metricsStub) SetActualCounters(uint64, uint64) {}
|
||||
|
||||
func (s *metricsStub) Evict(storageType) {}
|
||||
func (s *metricsStub) Flush(bool, StorageType) {}
|
||||
|
||||
func (s *metricsStub) Evict(StorageType) {}
|
||||
|
|
|
@ -36,7 +36,7 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
|
|||
|
||||
startedAt := time.Now()
|
||||
added := false
|
||||
ale64bit marked this conversation as resolved
Outdated
ale64bit
commented
hmm, is it really better having these two vars assigned than just calling the hmm, is it really better having these two vars assigned than just calling the `c.metrics.Put` where they are assigned? Following the flow seems a bit messier this way, IMHO.
dstepanov-yadro
commented
It seems to me that it's easier this way: negative case by default, but for success way we need to change vars. It seems to me that it's easier this way: negative case by default, but for success way we need to change vars.
|
||||
storageType := storageTypeUndefined
|
||||
storageType := StorageTypeUndefined
|
||||
defer func() {
|
||||
c.metrics.Put(time.Since(startedAt), added, storageType)
|
||||
}()
|
||||
|
@ -59,7 +59,7 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
|
|||
}
|
||||
|
||||
if sz <= c.smallObjectSize {
|
||||
storageType = storageTypeDB
|
||||
storageType = StorageTypeDB
|
||||
err := c.putSmall(oi)
|
||||
if err == nil {
|
||||
added = true
|
||||
|
@ -67,7 +67,7 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
|
|||
return common.PutRes{}, err
|
||||
}
|
||||
|
||||
storageType = storageTypeFSTree
|
||||
storageType = StorageTypeFSTree
|
||||
err := c.putBig(ctx, oi.addr, prm)
|
||||
if err == nil {
|
||||
added = true
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
func (c *cache) estimateCacheSize() uint64 {
|
||||
db := c.objCounters.DB() * c.smallObjectSize
|
||||
fstree := c.objCounters.FS() * c.maxObjectSize
|
||||
c.metrics.Estimate(db, fstree)
|
||||
c.metrics.SetEstimateSize(db, fstree)
|
||||
return db + fstree
|
||||
}
|
||||
|
||||
|
@ -71,6 +71,7 @@ func (c *cache) initCounters() error {
|
|||
|
||||
c.objCounters.cDB.Store(inDB)
|
||||
c.objCounters.cFS.Store(inFS)
|
||||
c.metrics.SetActualCounters(inDB, inFS)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -79,7 +79,7 @@ func (c *cache) deleteFromDB(keys []string) []string {
|
|||
})
|
||||
for i := 0; i < errorIndex; i++ {
|
||||
c.objCounters.DecDB()
|
||||
c.metrics.Evict(storageTypeDB)
|
||||
c.metrics.Evict(StorageTypeDB)
|
||||
storagelog.Write(c.log,
|
||||
storagelog.AddressField(keys[i]),
|
||||
storagelog.StorageTypeField(wcStorageType),
|
||||
|
@ -122,7 +122,7 @@ func (c *cache) deleteFromDisk(ctx context.Context, keys []string) []string {
|
|||
storagelog.StorageTypeField(wcStorageType),
|
||||
storagelog.OpField("fstree DELETE"),
|
||||
)
|
||||
c.metrics.Evict(storageTypeFSTree)
|
||||
c.metrics.Evict(StorageTypeFSTree)
|
||||
c.objCounters.DecFS()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -48,6 +48,18 @@ func newGaugeVec(opts prometheus.GaugeOpts, labelNames []string) metric[*prometh
|
|||
}
|
||||
}
|
||||
|
||||
func newGaugeFunc(opts prometheus.GaugeOpts, f func() float64) metric[prometheus.GaugeFunc] {
|
||||
return metric[prometheus.GaugeFunc]{
|
||||
value: prometheus.NewGaugeFunc(opts, f),
|
||||
desc: Description{
|
||||
Name: prometheus.BuildFQName(opts.Namespace, opts.Subsystem, opts.Name),
|
||||
Type: dto.MetricType_GAUGE.String(),
|
||||
Help: opts.Help,
|
||||
ConstantLabels: opts.ConstLabels,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newCounter(opts prometheus.CounterOpts) metric[prometheus.Counter] {
|
||||
return metric[prometheus.Counter]{
|
||||
value: prometheus.NewCounter(opts),
|
||||
|
@ -60,6 +72,32 @@ func newCounter(opts prometheus.CounterOpts) metric[prometheus.Counter] {
|
|||
}
|
||||
}
|
||||
|
||||
func newCounterVec(opts prometheus.CounterOpts, labels []string) metric[*prometheus.CounterVec] {
|
||||
return metric[*prometheus.CounterVec]{
|
||||
value: prometheus.NewCounterVec(opts, labels),
|
||||
desc: Description{
|
||||
Name: prometheus.BuildFQName(opts.Namespace, opts.Subsystem, opts.Name),
|
||||
Type: dto.MetricType_COUNTER.String(),
|
||||
Help: opts.Help,
|
||||
ConstantLabels: opts.ConstLabels,
|
||||
VariableLabels: labels,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newHistogramVec(opts prometheus.HistogramOpts, labelNames []string) metric[*prometheus.HistogramVec] {
|
||||
return metric[*prometheus.HistogramVec]{
|
||||
value: prometheus.NewHistogramVec(opts, labelNames),
|
||||
desc: Description{
|
||||
Name: prometheus.BuildFQName(opts.Namespace, opts.Subsystem, opts.Name),
|
||||
Type: dto.MetricType_HISTOGRAM.String(),
|
||||
Help: opts.Help,
|
||||
ConstantLabels: opts.ConstLabels,
|
||||
VariableLabels: labelNames,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// DescribeAll returns descriptions for all registered metrics.
|
||||
func DescribeAll() ([]Description, error) {
|
||||
registeredDescriptionsMtx.Lock()
|
||||
|
|
|
@ -10,6 +10,8 @@ type NodeMetrics struct {
|
|||
stateMetrics
|
||||
replicatorMetrics
|
||||
epoch metric[prometheus.Gauge]
|
||||
|
||||
writeCacheMetrics *writeCacheMetrics
|
||||
}
|
||||
|
||||
func NewNodeMetrics() *NodeMetrics {
|
||||
|
@ -33,12 +35,16 @@ func NewNodeMetrics() *NodeMetrics {
|
|||
})
|
||||
mustRegister(epoch)
|
||||
|
||||
ale64bit marked this conversation as resolved
Outdated
ale64bit
commented
s/ s/`wcMetrcis`/`wcMetrics`
dstepanov-yadro
commented
fixed fixed
ale64bit marked this conversation as resolved
Outdated
ale64bit
commented
I would personally avoid this sort of abbreviation in type names. I would personally avoid this sort of abbreviation in type names. `writeCacheMetrics` is not that long, and autocomplete helps.
dstepanov-yadro
commented
fixed fixed
|
||||
writeCacheMetrics := newWriteCacheMetrics()
|
||||
writeCacheMetrics.register()
|
||||
|
||||
return &NodeMetrics{
|
||||
objectServiceMetrics: objectService,
|
||||
engineMetrics: engine,
|
||||
stateMetrics: state,
|
||||
replicatorMetrics: replicator,
|
||||
epoch: epoch,
|
||||
writeCacheMetrics: writeCacheMetrics,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -46,3 +52,11 @@ func NewNodeMetrics() *NodeMetrics {
|
|||
func (m *NodeMetrics) SetEpoch(epoch uint64) {
|
||||
m.epoch.value.Set(float64(epoch))
|
||||
}
|
||||
|
||||
// WriteCache returns WriteCache metrics.
|
||||
func (m *NodeMetrics) WriteCache() WriteCacheMetrics {
|
||||
if m == nil {
|
||||
return nil
|
||||
}
|
||||
return m.writeCacheMetrics
|
||||
}
|
||||
|
|
252
pkg/metrics/writecache.go
Normal file
|
@ -0,0 +1,252 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const (
|
||||
wcSubsystem = "writecache"
|
||||
wcShardID = "shard_id"
|
||||
wcSuccess = "success"
|
||||
wcStorage = "storage"
|
||||
wcMode = "mode"
|
||||
)
|
||||
|
||||
type shardIDMode struct {
|
||||
shardID, mode string
|
||||
}
|
||||
|
||||
type WriteCacheMetrics interface {
|
||||
AddGetDuration(shardID string, success bool, d time.Duration)
|
||||
IncGetCounter(shardID string, success bool, storageType string)
|
||||
|
||||
AddDeleteDuration(shardID string, success bool, d time.Duration)
|
||||
IncDeleteCounter(shardID string, success bool, storageType string)
|
||||
|
||||
AddPutDuration(shardID string, success bool, d time.Duration)
|
||||
IncPutCounter(shardID string, success bool, storageType string)
|
||||
|
||||
IncActualCount(shardID string, storageType string)
|
||||
DecActualCount(shardID string, storageType string)
|
||||
SetActualCount(shardID string, count uint64, storageType string)
|
||||
|
||||
SetEstimateSize(shardID string, size uint64, storageType string)
|
||||
SetMode(shardID string, mode string)
|
||||
|
||||
IncFlushCounter(shardID string, success bool, storageType string)
|
||||
IncEvictCounter(shardID string, storageType string)
|
||||
}
|
||||
ale64bit marked this conversation as resolved
Outdated
ale64bit
commented
no need for pointer no need for pointer
dstepanov-yadro
commented
fixed fixed
|
||||
|
||||
type writeCacheMetrics struct {
|
||||
getDuration metric[*prometheus.HistogramVec]
|
||||
getCounter metric[*prometheus.CounterVec]
|
||||
|
||||
putDuration metric[*prometheus.HistogramVec]
|
||||
putCounter metric[*prometheus.CounterVec]
|
||||
|
||||
deleteDuration metric[*prometheus.HistogramVec]
|
||||
deleteCounter metric[*prometheus.CounterVec]
|
||||
|
||||
flushCounter metric[*prometheus.CounterVec]
|
||||
evictCounter metric[*prometheus.CounterVec]
|
||||
|
||||
actualCount metric[*prometheus.GaugeVec]
|
||||
|
||||
estimatedSize metric[*prometheus.GaugeVec]
|
||||
|
||||
modeMetrics map[shardIDMode]metric[prometheus.GaugeFunc]
|
||||
modeValues map[string]string
|
||||
modeMtx sync.RWMutex
|
||||
ale64bit marked this conversation as resolved
Outdated
ale64bit
commented
eventually we will probably want to think about better structuring this (not specific to this PR). I suspect namespacing the metrics with prefixes in method names won't scale :)) eventually we will probably want to think about better structuring this (not specific to this PR). I suspect namespacing the metrics with prefixes in method names won't scale :))
fyrchik
commented
Maybe we can try it here already? Like Maybe we can try it here already? Like `nodeMetrics.WriteCache()`
ale64bit
commented
if @dstepanov-yadro doesn't mind, that would be ideal. if @dstepanov-yadro doesn't mind, that would be ideal.
dstepanov-yadro
commented
done done
|
||||
}
|
||||
|
||||
func newWriteCacheMetrics() *writeCacheMetrics {
|
||||
return &writeCacheMetrics{
|
||||
getDuration: newWCMethodDurationCounter("get"),
|
||||
getCounter: newWCMethodCounterVec("get"),
|
||||
putDuration: newWCMethodDurationCounter("put"),
|
||||
putCounter: newWCMethodCounterVec("put"),
|
||||
deleteDuration: newWCMethodDurationCounter("delete"),
|
||||
deleteCounter: newWCMethodCounterVec("delete"),
|
||||
flushCounter: newWCOperationCounterVec("flush", []string{wcShardID, wcStorage, wcSuccess}),
|
||||
evictCounter: newWCOperationCounterVec("evict", []string{wcShardID, wcStorage}),
|
||||
actualCount: newWCGaugeVec("actual_objects_count", "Actual objects count in writecache", []string{wcShardID, wcStorage}),
|
||||
estimatedSize: newWCGaugeVec("estimated_size_bytes", "Estimated writecache size", []string{wcShardID, wcStorage}),
|
||||
modeMtx: sync.RWMutex{},
|
||||
modeMetrics: make(map[shardIDMode]metric[prometheus.GaugeFunc]),
|
||||
modeValues: make(map[string]string),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *writeCacheMetrics) AddGetDuration(shardID string, success bool, d time.Duration) {
|
||||
setWriteCacheDuration(m.getDuration.value, shardID, success, d)
|
||||
}
|
||||
|
||||
func (m *writeCacheMetrics) IncGetCounter(shardID string, success bool, storageType string) {
|
||||
incWriteCacheCounter(m.getCounter.value, shardID, success, storageType)
|
||||
}
|
||||
|
||||
func (m *writeCacheMetrics) AddDeleteDuration(shardID string, success bool, d time.Duration) {
|
||||
setWriteCacheDuration(m.deleteDuration.value, shardID, success, d)
|
||||
}
|
||||
|
||||
func (m *writeCacheMetrics) IncDeleteCounter(shardID string, success bool, storageType string) {
|
||||
incWriteCacheCounter(m.deleteCounter.value, shardID, success, storageType)
|
||||
}
|
||||
|
||||
func (m *writeCacheMetrics) AddPutDuration(shardID string, success bool, d time.Duration) {
|
||||
setWriteCacheDuration(m.putDuration.value, shardID, success, d)
|
||||
}
|
||||
|
||||
func (m *writeCacheMetrics) IncPutCounter(shardID string, success bool, storageType string) {
|
||||
incWriteCacheCounter(m.putCounter.value, shardID, success, storageType)
|
||||
}
|
||||
|
||||
func (m *writeCacheMetrics) IncActualCount(shardID string, storageType string) {
|
||||
m.actualCount.value.With(prometheus.Labels{
|
||||
wcShardID: shardID,
|
||||
wcStorage: storageType,
|
||||
}).Inc()
|
||||
}
|
||||
|
||||
func (m *writeCacheMetrics) DecActualCount(shardID string, storageType string) {
|
||||
m.actualCount.value.With(prometheus.Labels{
|
||||
wcShardID: shardID,
|
||||
wcStorage: storageType,
|
||||
}).Dec()
|
||||
}
|
||||
|
||||
func (m *writeCacheMetrics) SetActualCount(shardID string, count uint64, storageType string) {
|
||||
m.actualCount.value.With(prometheus.Labels{
|
||||
wcShardID: shardID,
|
||||
wcStorage: storageType,
|
||||
}).Set(float64(count))
|
||||
}
|
||||
|
||||
func (m *writeCacheMetrics) SetEstimateSize(shardID string, size uint64, storageType string) {
|
||||
m.estimatedSize.value.With(prometheus.Labels{
|
||||
wcShardID: shardID,
|
||||
wcStorage: storageType,
|
||||
}).Set(float64(size))
|
||||
}
|
||||
|
||||
func (m *writeCacheMetrics) SetMode(shardID string, mode string) {
|
||||
m.modeMtx.Lock()
|
||||
defer m.modeMtx.Unlock()
|
||||
|
||||
m.modeValues[shardID] = mode
|
||||
key := shardIDMode{
|
||||
shardID: shardID,
|
||||
mode: mode,
|
||||
}
|
||||
if _, found := m.modeMetrics[key]; found {
|
||||
return
|
||||
}
|
||||
|
||||
metric := newGaugeFunc(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: wcSubsystem,
|
||||
Name: "writecache_mode",
|
||||
Help: "Writecache mode value",
|
||||
ConstLabels: prometheus.Labels{
|
||||
wcShardID: shardID,
|
||||
wcMode: mode,
|
||||
},
|
||||
}, func() float64 {
|
||||
m.modeMtx.RLock()
|
||||
defer m.modeMtx.RUnlock()
|
||||
|
||||
value := m.modeValues[shardID]
|
||||
if value == mode {
|
||||
return 1
|
||||
}
|
||||
return 0
|
||||
})
|
||||
mustRegister(metric)
|
||||
m.modeMetrics[key] = metric
|
||||
}
|
||||
|
||||
func (m *writeCacheMetrics) IncFlushCounter(shardID string, success bool, storageType string) {
|
||||
m.flushCounter.value.With(prometheus.Labels{
|
||||
wcShardID: shardID,
|
||||
wcSuccess: fmt.Sprintf("%v", success),
|
||||
wcStorage: storageType,
|
||||
}).Inc()
|
||||
}
|
||||
|
||||
ale64bit marked this conversation as resolved
Outdated
ale64bit
commented
I'm not sure, but do we actually need to pass these (i.e. I'm not sure, but do we actually need to pass these (i.e. `m`) using pointers?
dstepanov-yadro
commented
replaced with value pointer replaced with value pointer
|
||||
func (m *writeCacheMetrics) IncEvictCounter(shardID string, storageType string) {
|
||||
m.evictCounter.value.With(prometheus.Labels{
|
||||
wcShardID: shardID,
|
||||
wcStorage: storageType,
|
||||
}).Inc()
|
||||
}
|
||||
|
||||
func (m *writeCacheMetrics) register() {
|
||||
mustRegister(m.getDuration)
|
||||
mustRegister(m.getCounter)
|
||||
mustRegister(m.putDuration)
|
||||
mustRegister(m.putCounter)
|
||||
mustRegister(m.deleteDuration)
|
||||
mustRegister(m.deleteCounter)
|
||||
mustRegister(m.actualCount)
|
||||
mustRegister(m.estimatedSize)
|
||||
mustRegister(m.flushCounter)
|
||||
mustRegister(m.evictCounter)
|
||||
}
|
||||
|
||||
func setWriteCacheDuration(m *prometheus.HistogramVec, shardID string, success bool, d time.Duration) {
|
||||
m.With(
|
||||
prometheus.Labels{
|
||||
wcShardID: shardID,
|
||||
wcSuccess: fmt.Sprintf("%v", success),
|
||||
},
|
||||
).Observe(float64(d))
|
||||
}
|
||||
|
||||
func incWriteCacheCounter(m *prometheus.CounterVec, shardID string, success bool, storageType string) {
|
||||
m.With(prometheus.Labels{
|
||||
wcShardID: shardID,
|
||||
wcSuccess: fmt.Sprintf("%v", success),
|
||||
wcStorage: storageType,
|
||||
}).Inc()
|
||||
}
|
||||
|
||||
func newWCMethodDurationCounter(method string) metric[*prometheus.HistogramVec] {
|
||||
return newHistogramVec(prometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: wcSubsystem,
|
||||
Name: fmt.Sprintf("%s_req_duration_seconds", method),
|
||||
Help: fmt.Sprintf("Accumulated %s request process duration", method),
|
||||
}, []string{wcShardID, wcSuccess})
|
||||
}
|
||||
|
||||
func newWCMethodCounterVec(method string) metric[*prometheus.CounterVec] {
|
||||
return newCounterVec(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: wcSubsystem,
|
||||
Name: fmt.Sprintf("%s_req_count", method),
|
||||
Help: fmt.Sprintf("The number of %s requests processed", method),
|
||||
}, []string{wcShardID, wcSuccess, wcStorage})
|
||||
}
|
||||
|
||||
func newWCOperationCounterVec(operation string, labels []string) metric[*prometheus.CounterVec] {
|
||||
return newCounterVec(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: wcSubsystem,
|
||||
Name: fmt.Sprintf("%s_operation_count", operation),
|
||||
Help: fmt.Sprintf("The number of %s operations processed", operation),
|
||||
}, labels)
|
||||
}
|
||||
|
||||
func newWCGaugeVec(name, help string, labels []string) metric[*prometheus.GaugeVec] {
|
||||
return newGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: wcSubsystem,
|
||||
Name: name,
|
||||
Help: help,
|
||||
}, labels)
|
||||
}
|
We already have
AddXDuration
pattern for names in this package, why not follow?fixed