Write cache metrcis #378

Merged
fyrchik merged 4 commits from dstepanov-yadro/frostfs-node:feat/write-cache-metrics into master 2023-05-24 10:18:41 +00:00
19 changed files with 514 additions and 69 deletions

View file

@ -9,6 +9,7 @@ Changelog for FrostFS Node
- Reload pprof and metrics on SIGHUP for ir (#125)
- Support copies number parameter in `frostfs-cli object put` (#351)
- Set extra wallets on SIGHUP for ir (#125)
- Writecache metrics (#312)
### Changed
- `frostfs-cli util locode generate` is now much faster (#309)

View file

@ -2,6 +2,8 @@ package engine
import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics"
)
type MetricRegister interface {
@ -24,6 +26,8 @@ type MetricRegister interface {
AddToContainerSize(cnrID string, size int64)
AddToPayloadCounter(shardID string, size int64)
WriteCache() metrics.WriteCacheMetrics
}
func elapsed(addFunc func(d time.Duration)) func() {

View file

@ -8,6 +8,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/hrw"
"github.com/google/uuid"
@ -98,6 +99,12 @@ func (e *StorageEngine) createShard(opts []shard.Option) (*shard.Shard, error) {
mw: e.metrics,
},
))
opts = append(opts, shard.WithExtraWriteCacheOptions(writecache.WithMetrics(
&writeCacheMetrics{
shardID: id.String(),
metrics: e.metrics.WriteCache(),
},
)))
}
e.mtx.RUnlock()

View file

@ -2,9 +2,13 @@ package engine
import (
"context"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
@ -52,3 +56,52 @@ func (e *StorageEngine) FlushWriteCache(ctx context.Context, p FlushWriteCachePr
return FlushWriteCacheRes{}, sh.FlushWriteCache(ctx, prm)
}
type writeCacheMetrics struct {
shardID string
metrics metrics.WriteCacheMetrics
}
func (m *writeCacheMetrics) Get(d time.Duration, success bool, st writecache.StorageType) {
m.metrics.AddGetDuration(m.shardID, success, d)
m.metrics.IncGetCounter(m.shardID, success, st.String())
}
func (m *writeCacheMetrics) Delete(d time.Duration, success bool, st writecache.StorageType) {
m.metrics.AddDeleteDuration(m.shardID, success, d)
m.metrics.IncDeleteCounter(m.shardID, success, st.String())
if success {
m.metrics.DecActualCount(m.shardID, st.String())
}
}
func (m *writeCacheMetrics) Put(d time.Duration, success bool, st writecache.StorageType) {
m.metrics.AddPutDuration(m.shardID, success, d)
m.metrics.IncPutCounter(m.shardID, success, st.String())
if success {
m.metrics.IncActualCount(m.shardID, st.String())
}
}
func (m *writeCacheMetrics) SetEstimateSize(db, fstree uint64) {
m.metrics.SetEstimateSize(m.shardID, db, writecache.StorageTypeDB.String())
m.metrics.SetEstimateSize(m.shardID, fstree, writecache.StorageTypeFSTree.String())
}
func (m *writeCacheMetrics) SetMode(mode mode.Mode) {
m.metrics.SetMode(m.shardID, mode.String())
}
func (m *writeCacheMetrics) SetActualCounters(db, fstree uint64) {
m.metrics.SetActualCount(m.shardID, db, writecache.StorageTypeDB.String())
m.metrics.SetActualCount(m.shardID, fstree, writecache.StorageTypeFSTree.String())
}
func (m *writeCacheMetrics) Flush(success bool, st writecache.StorageType) {
m.metrics.IncFlushCounter(m.shardID, success, st.String())
}
func (m *writeCacheMetrics) Evict(st writecache.StorageType) {
m.metrics.DecActualCount(m.shardID, st.String())
m.metrics.IncEvictCounter(m.shardID, st.String())
}

View file

@ -187,6 +187,13 @@ func WithWriteCacheOptions(opts ...writecache.Option) Option {
}
}
// WithExtraWriteCacheOptions returns option to add extra write cache options.
func WithExtraWriteCacheOptions(opts ...writecache.Option) Option {
return func(c *cfg) {
c.writeCacheOpts = append(c.writeCacheOpts, opts...)
}
}
// WithPiloramaOptions returns option to set internal write cache options.
func WithPiloramaOptions(opts ...pilorama.Option) Option {
return func(c *cfg) {

View file

@ -2,6 +2,7 @@ package writecache
import (
"context"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
@ -22,6 +23,13 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
))
defer span.End()
deleted := false
storageType := StorageTypeUndefined
startedAt := time.Now()
defer func() {
c.metrics.Delete(time.Since(startedAt), deleted, storageType)
}()
c.modeMtx.RLock()
defer c.modeMtx.RUnlock()
if c.readOnly() {
@ -30,15 +38,15 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
saddr := addr.EncodeToString()
// Check disk cache.
var has int
var dataSize int
_ = c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
has = len(b.Get([]byte(saddr)))
dataSize = len(b.Get([]byte(saddr)))
return nil
})
if 0 < has {
if dataSize > 0 {
storageType = StorageTypeDB
err := c.db.Update(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
err := b.Delete([]byte(saddr))
@ -52,10 +60,12 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("db DELETE"),
)
deleted = true
c.objCounters.DecDB()
return nil
}
storageType = StorageTypeFSTree
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
if err == nil {
storagelog.Write(c.log,
@ -64,6 +74,7 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
storagelog.OpField("fstree DELETE"),
)
c.objCounters.DecFS()
deleted = true
}
return err

View file

@ -199,7 +199,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
return err
}
err = c.flushObject(ctx, &obj, data)
err = c.flushObject(ctx, &obj, data, StorageTypeFSTree)
if err != nil {
if ignoreErrors {
return nil
@ -228,7 +228,7 @@ func (c *cache) workerFlushSmall() {
return
}
err := c.flushObject(context.TODO(), obj, nil)
err := c.flushObject(context.TODO(), obj, nil, StorageTypeDB)
if err != nil {
// Error is handled in flushObject.
continue
@ -239,7 +239,13 @@ func (c *cache) workerFlushSmall() {
}
// flushObject is used to write object directly to the main storage.
func (c *cache) flushObject(ctx context.Context, obj *object.Object, data []byte) error {
func (c *cache) flushObject(ctx context.Context, obj *object.Object, data []byte, st StorageType) error {
var err error
defer func() {
c.metrics.Flush(err == nil, st)
}()
addr := objectCore.AddressOf(obj)
var prm common.PutPrm
@ -313,7 +319,7 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
return err
}
if err := c.flushObject(ctx, &obj, data); err != nil {
if err := c.flushObject(ctx, &obj, data, StorageTypeDB); err != nil {
return err
}
}

View file

@ -2,6 +2,7 @@ package writecache
import (
"context"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
@ -27,9 +28,22 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
))
defer span.End()
return c.getInternal(ctx, saddr, addr)
}
func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address) (*objectSDK.Object, error) {
found := false
storageType := StorageTypeUndefined
startedAt := time.Now()
defer func() {
c.metrics.Get(time.Since(startedAt), found, storageType)
}()
value, err := Get(c.db, []byte(saddr))
if err == nil {
obj := objectSDK.New()
found = true
storageType = StorageTypeDB
return obj, obj.Unmarshal(value)
}
@ -38,6 +52,8 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
return nil, logicerr.Wrap(apistatus.ObjectNotFound{})
}
found = true
storageType = StorageTypeFSTree
return res.Object, nil
}
@ -45,13 +61,15 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
//
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
saddr := addr.EncodeToString()
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Head",
trace.WithAttributes(
attribute.String("address", addr.EncodeToString()),
attribute.String("address", saddr),
))
defer span.End()
obj, err := c.Get(ctx, addr)
obj, err := c.getInternal(ctx, saddr, addr)
if err != nil {
return nil, err
}

View file

@ -4,7 +4,6 @@ import (
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
)
@ -12,59 +11,6 @@ import (
// ErrNoDefaultBucket is returned by IterateDB when default bucket for objects is missing.
var ErrNoDefaultBucket = errors.New("no default bucket")
// IterationPrm contains iteration parameters.
type IterationPrm struct {
handler func([]byte) error
ignoreErrors bool
}
// WithHandler sets a callback to be executed on every object.
func (p *IterationPrm) WithHandler(f func([]byte) error) {
p.handler = f
}
// WithIgnoreErrors sets a flag indicating that errors should be ignored.
func (p *IterationPrm) WithIgnoreErrors(ignore bool) {
p.ignoreErrors = ignore
}
// Iterate iterates over all objects present in write cache.
// This is very difficult to do correctly unless write-cache is put in read-only mode.
// Thus we silently fail if shard is not in read-only mode to avoid reporting misleading results.
func (c *cache) Iterate(prm IterationPrm) error {
c.modeMtx.RLock()
defer c.modeMtx.RUnlock()
if !c.readOnly() {
return nil
}
err := c.db.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucket)
return b.ForEach(func(k, data []byte) error {
return prm.handler(data)
})
})
if err != nil {
return err
}
var fsPrm common.IteratePrm
fsPrm.IgnoreErrors = prm.ignoreErrors
fsPrm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
data, err := f()
if err != nil {
if prm.ignoreErrors {
return nil
}
return err
}
return prm.handler(data)
}
_, err = c.fsTree.Iterate(fsPrm)
return err
}
// IterateDB iterates over all objects stored in bbolt.DB instance and passes them to f until error return.
// It is assumed that db is an underlying database of some WriteCache instance.
//

View file

@ -0,0 +1,49 @@
package writecache
import (
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
)
type StorageType string
func (t StorageType) String() string {
return string(t)
}
const (
StorageTypeUndefined StorageType = "null"
StorageTypeDB StorageType = "db"
StorageTypeFSTree StorageType = "fstree"
)
type Metrics interface {
Get(d time.Duration, success bool, st StorageType)
Delete(d time.Duration, success bool, st StorageType)
Put(d time.Duration, success bool, st StorageType)
Flush(success bool, st StorageType)
Evict(st StorageType)
SetEstimateSize(db, fstree uint64)
SetMode(m mode.Mode)
SetActualCounters(db, fstree uint64)
}
type metricsStub struct{}
func (s *metricsStub) Get(time.Duration, bool, StorageType) {}
func (s *metricsStub) Delete(time.Duration, bool, StorageType) {}
func (s *metricsStub) Put(time.Duration, bool, StorageType) {}
func (s *metricsStub) SetEstimateSize(uint64, uint64) {}
func (s *metricsStub) SetMode(mode.Mode) {}
func (s *metricsStub) SetActualCounters(uint64, uint64) {}
func (s *metricsStub) Flush(bool, StorageType) {}
func (s *metricsStub) Evict(StorageType) {}

View file

@ -29,7 +29,11 @@ func (c *cache) SetMode(m mode.Mode) error {
))
defer span.End()
return c.setMode(ctx, m)
err := c.setMode(ctx, m)
if err == nil {
c.metrics.SetMode(m)
}
return err
}
// setMode applies new mode. Must be called with cache.modeMtx lock taken.

View file

@ -60,6 +60,8 @@ type options struct {
reportError func(string, error)
// openFile is the function called internally by bbolt to open database files. Useful for hermetic testing.
openFile func(string, int, fs.FileMode) (*os.File, error)
// metrics is metrics implementation
metrics Metrics
}
// WithLogger sets logger.
@ -164,3 +166,10 @@ func WithOpenFile(f func(string, int, fs.FileMode) (*os.File, error)) Option {
o.openFile = f
}
}
// WithMetrics sets metrics implementation.
func WithMetrics(metrics Metrics) Option {
return func(o *options) {
o.metrics = metrics
}
}

View file

@ -3,6 +3,7 @@ package writecache
import (
"context"
"errors"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
@ -33,6 +34,13 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
))
defer span.End()
startedAt := time.Now()
added := false
storageType := StorageTypeUndefined
defer func() {
c.metrics.Put(time.Since(startedAt), added, storageType)
}()
c.modeMtx.RLock()
defer c.modeMtx.RUnlock()
if c.readOnly() {
@ -51,9 +59,20 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
}
if sz <= c.smallObjectSize {
return common.PutRes{}, c.putSmall(oi)
storageType = StorageTypeDB
err := c.putSmall(oi)
if err == nil {
added = true
}
return common.PutRes{}, err
}
return common.PutRes{}, c.putBig(ctx, oi.addr, prm)
storageType = StorageTypeFSTree
err := c.putBig(ctx, oi.addr, prm)
if err == nil {
added = true
}
return common.PutRes{}, err
}
// putSmall persists small objects to the write-cache database and

View file

@ -9,7 +9,10 @@ import (
)
func (c *cache) estimateCacheSize() uint64 {
return c.objCounters.DB()*c.smallObjectSize + c.objCounters.FS()*c.maxObjectSize
db := c.objCounters.DB() * c.smallObjectSize
fstree := c.objCounters.FS() * c.maxObjectSize
c.metrics.SetEstimateSize(db, fstree)
return db + fstree
}
func (c *cache) incSizeDB(sz uint64) uint64 {
@ -68,6 +71,7 @@ func (c *cache) initCounters() error {
c.objCounters.cDB.Store(inDB)
c.objCounters.cFS.Store(inFS)
c.metrics.SetActualCounters(inDB, inFS)
return nil
}

View file

@ -79,6 +79,7 @@ func (c *cache) deleteFromDB(keys []string) []string {
})
for i := 0; i < errorIndex; i++ {
c.objCounters.DecDB()
c.metrics.Evict(StorageTypeDB)
storagelog.Write(c.log,
storagelog.AddressField(keys[i]),
storagelog.StorageTypeField(wcStorageType),
@ -121,6 +122,7 @@ func (c *cache) deleteFromDisk(ctx context.Context, keys []string) []string {
storagelog.StorageTypeField(wcStorageType),
storagelog.OpField("fstree DELETE"),
)
c.metrics.Evict(StorageTypeFSTree)
c.objCounters.DecFS()
}
}

View file

@ -32,7 +32,6 @@ type Cache interface {
// Returns apistatus.ObjectNotFound if object is missing in the Cache.
// Returns ErrReadOnly if the Cache is currently in the read-only mode.
Delete(context.Context, oid.Address) error
Iterate(IterationPrm) error
Put(context.Context, common.PutPrm) (common.PutRes, error)
SetMode(mode.Mode) error
SetLogger(*logger.Logger)
@ -104,6 +103,7 @@ func New(opts ...Option) Cache {
maxBatchSize: bbolt.DefaultMaxBatchSize,
maxBatchDelay: bbolt.DefaultMaxBatchDelay,
openFile: os.OpenFile,
metrics: &metricsStub{},
},
}
@ -141,6 +141,7 @@ func (c *cache) Open(readOnly bool) error {
// Init runs necessary services.
func (c *cache) Init() error {
c.metrics.SetMode(c.mode)
c.runFlushLoop()
return nil
}

View file

@ -48,6 +48,18 @@ func newGaugeVec(opts prometheus.GaugeOpts, labelNames []string) metric[*prometh
}
}
func newGaugeFunc(opts prometheus.GaugeOpts, f func() float64) metric[prometheus.GaugeFunc] {
return metric[prometheus.GaugeFunc]{
value: prometheus.NewGaugeFunc(opts, f),
desc: Description{
Name: prometheus.BuildFQName(opts.Namespace, opts.Subsystem, opts.Name),
Type: dto.MetricType_GAUGE.String(),
Help: opts.Help,
ConstantLabels: opts.ConstLabels,
},
}
}
func newCounter(opts prometheus.CounterOpts) metric[prometheus.Counter] {
return metric[prometheus.Counter]{
value: prometheus.NewCounter(opts),
@ -60,6 +72,32 @@ func newCounter(opts prometheus.CounterOpts) metric[prometheus.Counter] {
}
}
func newCounterVec(opts prometheus.CounterOpts, labels []string) metric[*prometheus.CounterVec] {
return metric[*prometheus.CounterVec]{
value: prometheus.NewCounterVec(opts, labels),
desc: Description{
Name: prometheus.BuildFQName(opts.Namespace, opts.Subsystem, opts.Name),
Type: dto.MetricType_COUNTER.String(),
Help: opts.Help,
ConstantLabels: opts.ConstLabels,
VariableLabels: labels,
},
}
}
func newHistogramVec(opts prometheus.HistogramOpts, labelNames []string) metric[*prometheus.HistogramVec] {
return metric[*prometheus.HistogramVec]{
value: prometheus.NewHistogramVec(opts, labelNames),
desc: Description{
Name: prometheus.BuildFQName(opts.Namespace, opts.Subsystem, opts.Name),
Type: dto.MetricType_HISTOGRAM.String(),
Help: opts.Help,
ConstantLabels: opts.ConstLabels,
VariableLabels: labelNames,
},
}
}
// DescribeAll returns descriptions for all registered metrics.
func DescribeAll() ([]Description, error) {
registeredDescriptionsMtx.Lock()

View file

@ -10,6 +10,8 @@ type NodeMetrics struct {
stateMetrics
replicatorMetrics
epoch metric[prometheus.Gauge]
writeCacheMetrics *writeCacheMetrics
}
func NewNodeMetrics() *NodeMetrics {
@ -33,12 +35,16 @@ func NewNodeMetrics() *NodeMetrics {
})
mustRegister(epoch)
writeCacheMetrics := newWriteCacheMetrics()
writeCacheMetrics.register()
return &NodeMetrics{
objectServiceMetrics: objectService,
engineMetrics: engine,
stateMetrics: state,
replicatorMetrics: replicator,
epoch: epoch,
writeCacheMetrics: writeCacheMetrics,
}
}
@ -46,3 +52,11 @@ func NewNodeMetrics() *NodeMetrics {
func (m *NodeMetrics) SetEpoch(epoch uint64) {
m.epoch.value.Set(float64(epoch))
}
// WriteCache returns WriteCache metrics.
func (m *NodeMetrics) WriteCache() WriteCacheMetrics {
if m == nil {
return nil
}
return m.writeCacheMetrics
}

252
pkg/metrics/writecache.go Normal file
View file

@ -0,0 +1,252 @@
package metrics
import (
"fmt"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
)
const (
wcSubsystem = "writecache"
wcShardID = "shard_id"
wcSuccess = "success"
wcStorage = "storage"
wcMode = "mode"
)
type shardIDMode struct {
shardID, mode string
}
type WriteCacheMetrics interface {
AddGetDuration(shardID string, success bool, d time.Duration)
IncGetCounter(shardID string, success bool, storageType string)
AddDeleteDuration(shardID string, success bool, d time.Duration)
IncDeleteCounter(shardID string, success bool, storageType string)
AddPutDuration(shardID string, success bool, d time.Duration)
IncPutCounter(shardID string, success bool, storageType string)
IncActualCount(shardID string, storageType string)
DecActualCount(shardID string, storageType string)
SetActualCount(shardID string, count uint64, storageType string)
SetEstimateSize(shardID string, size uint64, storageType string)
SetMode(shardID string, mode string)
IncFlushCounter(shardID string, success bool, storageType string)
IncEvictCounter(shardID string, storageType string)
}
type writeCacheMetrics struct {
getDuration metric[*prometheus.HistogramVec]
getCounter metric[*prometheus.CounterVec]
putDuration metric[*prometheus.HistogramVec]
putCounter metric[*prometheus.CounterVec]
deleteDuration metric[*prometheus.HistogramVec]
deleteCounter metric[*prometheus.CounterVec]
flushCounter metric[*prometheus.CounterVec]
evictCounter metric[*prometheus.CounterVec]
actualCount metric[*prometheus.GaugeVec]
estimatedSize metric[*prometheus.GaugeVec]
modeMetrics map[shardIDMode]metric[prometheus.GaugeFunc]
modeValues map[string]string
modeMtx sync.RWMutex
}
func newWriteCacheMetrics() *writeCacheMetrics {
return &writeCacheMetrics{
getDuration: newWCMethodDurationCounter("get"),
getCounter: newWCMethodCounterVec("get"),
putDuration: newWCMethodDurationCounter("put"),
putCounter: newWCMethodCounterVec("put"),
deleteDuration: newWCMethodDurationCounter("delete"),
deleteCounter: newWCMethodCounterVec("delete"),
flushCounter: newWCOperationCounterVec("flush", []string{wcShardID, wcStorage, wcSuccess}),
evictCounter: newWCOperationCounterVec("evict", []string{wcShardID, wcStorage}),
actualCount: newWCGaugeVec("actual_objects_count", "Actual objects count in writecache", []string{wcShardID, wcStorage}),
estimatedSize: newWCGaugeVec("estimated_size_bytes", "Estimated writecache size", []string{wcShardID, wcStorage}),
modeMtx: sync.RWMutex{},
modeMetrics: make(map[shardIDMode]metric[prometheus.GaugeFunc]),
modeValues: make(map[string]string),
}
}
func (m *writeCacheMetrics) AddGetDuration(shardID string, success bool, d time.Duration) {
setWriteCacheDuration(m.getDuration.value, shardID, success, d)
}
func (m *writeCacheMetrics) IncGetCounter(shardID string, success bool, storageType string) {
incWriteCacheCounter(m.getCounter.value, shardID, success, storageType)
}
func (m *writeCacheMetrics) AddDeleteDuration(shardID string, success bool, d time.Duration) {
setWriteCacheDuration(m.deleteDuration.value, shardID, success, d)
}
func (m *writeCacheMetrics) IncDeleteCounter(shardID string, success bool, storageType string) {
incWriteCacheCounter(m.deleteCounter.value, shardID, success, storageType)
}
func (m *writeCacheMetrics) AddPutDuration(shardID string, success bool, d time.Duration) {
setWriteCacheDuration(m.putDuration.value, shardID, success, d)
}
func (m *writeCacheMetrics) IncPutCounter(shardID string, success bool, storageType string) {
incWriteCacheCounter(m.putCounter.value, shardID, success, storageType)
}
func (m *writeCacheMetrics) IncActualCount(shardID string, storageType string) {
m.actualCount.value.With(prometheus.Labels{
wcShardID: shardID,
wcStorage: storageType,
}).Inc()
}
func (m *writeCacheMetrics) DecActualCount(shardID string, storageType string) {
m.actualCount.value.With(prometheus.Labels{
wcShardID: shardID,
wcStorage: storageType,
}).Dec()
}
func (m *writeCacheMetrics) SetActualCount(shardID string, count uint64, storageType string) {
m.actualCount.value.With(prometheus.Labels{
wcShardID: shardID,
wcStorage: storageType,
}).Set(float64(count))
}
func (m *writeCacheMetrics) SetEstimateSize(shardID string, size uint64, storageType string) {
m.estimatedSize.value.With(prometheus.Labels{
wcShardID: shardID,
wcStorage: storageType,
}).Set(float64(size))
}
func (m *writeCacheMetrics) SetMode(shardID string, mode string) {
m.modeMtx.Lock()
defer m.modeMtx.Unlock()
m.modeValues[shardID] = mode
key := shardIDMode{
shardID: shardID,
mode: mode,
}
if _, found := m.modeMetrics[key]; found {
return
}
metric := newGaugeFunc(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: wcSubsystem,
Name: "writecache_mode",
Help: "Writecache mode value",
ConstLabels: prometheus.Labels{
wcShardID: shardID,
wcMode: mode,
},
}, func() float64 {
m.modeMtx.RLock()
defer m.modeMtx.RUnlock()
value := m.modeValues[shardID]
if value == mode {
return 1
}
return 0
})
mustRegister(metric)
m.modeMetrics[key] = metric
}
func (m *writeCacheMetrics) IncFlushCounter(shardID string, success bool, storageType string) {
m.flushCounter.value.With(prometheus.Labels{
wcShardID: shardID,
wcSuccess: fmt.Sprintf("%v", success),
wcStorage: storageType,
}).Inc()
}
func (m *writeCacheMetrics) IncEvictCounter(shardID string, storageType string) {
m.evictCounter.value.With(prometheus.Labels{
wcShardID: shardID,
wcStorage: storageType,
}).Inc()
}
func (m *writeCacheMetrics) register() {
mustRegister(m.getDuration)
mustRegister(m.getCounter)
mustRegister(m.putDuration)
mustRegister(m.putCounter)
mustRegister(m.deleteDuration)
mustRegister(m.deleteCounter)
mustRegister(m.actualCount)
mustRegister(m.estimatedSize)
mustRegister(m.flushCounter)
mustRegister(m.evictCounter)
}
func setWriteCacheDuration(m *prometheus.HistogramVec, shardID string, success bool, d time.Duration) {
m.With(
prometheus.Labels{
wcShardID: shardID,
wcSuccess: fmt.Sprintf("%v", success),
},
).Observe(float64(d))
}
func incWriteCacheCounter(m *prometheus.CounterVec, shardID string, success bool, storageType string) {
m.With(prometheus.Labels{
wcShardID: shardID,
wcSuccess: fmt.Sprintf("%v", success),
wcStorage: storageType,
}).Inc()
}
func newWCMethodDurationCounter(method string) metric[*prometheus.HistogramVec] {
return newHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: wcSubsystem,
Name: fmt.Sprintf("%s_req_duration_seconds", method),
Help: fmt.Sprintf("Accumulated %s request process duration", method),
}, []string{wcShardID, wcSuccess})
}
func newWCMethodCounterVec(method string) metric[*prometheus.CounterVec] {
return newCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: wcSubsystem,
Name: fmt.Sprintf("%s_req_count", method),
Help: fmt.Sprintf("The number of %s requests processed", method),
}, []string{wcShardID, wcSuccess, wcStorage})
}
func newWCOperationCounterVec(operation string, labels []string) metric[*prometheus.CounterVec] {
return newCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: wcSubsystem,
Name: fmt.Sprintf("%s_operation_count", operation),
Help: fmt.Sprintf("The number of %s operations processed", operation),
}, labels)
}
func newWCGaugeVec(name, help string, labels []string) metric[*prometheus.GaugeVec] {
return newGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: wcSubsystem,
Name: name,
Help: help,
}, labels)
}