Compare commits
4 commits
master
...
feat/write
Author | SHA1 | Date | |
---|---|---|---|
c81d44a06d | |||
769526d8dd | |||
47eccc1de1 | |||
d4a2d0d3e8 |
19 changed files with 514 additions and 69 deletions
|
@ -9,6 +9,7 @@ Changelog for FrostFS Node
|
||||||
- Reload pprof and metrics on SIGHUP for ir (#125)
|
- Reload pprof and metrics on SIGHUP for ir (#125)
|
||||||
- Support copies number parameter in `frostfs-cli object put` (#351)
|
- Support copies number parameter in `frostfs-cli object put` (#351)
|
||||||
- Set extra wallets on SIGHUP for ir (#125)
|
- Set extra wallets on SIGHUP for ir (#125)
|
||||||
|
- Writecache metrics (#312)
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
- `frostfs-cli util locode generate` is now much faster (#309)
|
- `frostfs-cli util locode generate` is now much faster (#309)
|
||||||
|
|
|
@ -2,6 +2,8 @@ package engine
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
type MetricRegister interface {
|
type MetricRegister interface {
|
||||||
|
@ -24,6 +26,8 @@ type MetricRegister interface {
|
||||||
|
|
||||||
AddToContainerSize(cnrID string, size int64)
|
AddToContainerSize(cnrID string, size int64)
|
||||||
AddToPayloadCounter(shardID string, size int64)
|
AddToPayloadCounter(shardID string, size int64)
|
||||||
|
|
||||||
|
WriteCache() metrics.WriteCacheMetrics
|
||||||
}
|
}
|
||||||
|
|
||||||
func elapsed(addFunc func(d time.Duration)) func() {
|
func elapsed(addFunc func(d time.Duration)) func() {
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"git.frostfs.info/TrueCloudLab/hrw"
|
"git.frostfs.info/TrueCloudLab/hrw"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
|
@ -98,6 +99,12 @@ func (e *StorageEngine) createShard(opts []shard.Option) (*shard.Shard, error) {
|
||||||
mw: e.metrics,
|
mw: e.metrics,
|
||||||
},
|
},
|
||||||
))
|
))
|
||||||
|
opts = append(opts, shard.WithExtraWriteCacheOptions(writecache.WithMetrics(
|
||||||
|
&writeCacheMetrics{
|
||||||
|
shardID: id.String(),
|
||||||
|
metrics: e.metrics.WriteCache(),
|
||||||
|
},
|
||||||
|
)))
|
||||||
}
|
}
|
||||||
|
|
||||||
e.mtx.RUnlock()
|
e.mtx.RUnlock()
|
||||||
|
|
|
@ -2,9 +2,13 @@ package engine
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
)
|
)
|
||||||
|
@ -52,3 +56,52 @@ func (e *StorageEngine) FlushWriteCache(ctx context.Context, p FlushWriteCachePr
|
||||||
|
|
||||||
return FlushWriteCacheRes{}, sh.FlushWriteCache(ctx, prm)
|
return FlushWriteCacheRes{}, sh.FlushWriteCache(ctx, prm)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type writeCacheMetrics struct {
|
||||||
|
shardID string
|
||||||
|
metrics metrics.WriteCacheMetrics
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *writeCacheMetrics) Get(d time.Duration, success bool, st writecache.StorageType) {
|
||||||
|
m.metrics.AddGetDuration(m.shardID, success, d)
|
||||||
|
m.metrics.IncGetCounter(m.shardID, success, st.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *writeCacheMetrics) Delete(d time.Duration, success bool, st writecache.StorageType) {
|
||||||
|
m.metrics.AddDeleteDuration(m.shardID, success, d)
|
||||||
|
m.metrics.IncDeleteCounter(m.shardID, success, st.String())
|
||||||
|
if success {
|
||||||
|
m.metrics.DecActualCount(m.shardID, st.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *writeCacheMetrics) Put(d time.Duration, success bool, st writecache.StorageType) {
|
||||||
|
m.metrics.AddPutDuration(m.shardID, success, d)
|
||||||
|
m.metrics.IncPutCounter(m.shardID, success, st.String())
|
||||||
|
if success {
|
||||||
|
m.metrics.IncActualCount(m.shardID, st.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *writeCacheMetrics) SetEstimateSize(db, fstree uint64) {
|
||||||
|
m.metrics.SetEstimateSize(m.shardID, db, writecache.StorageTypeDB.String())
|
||||||
|
m.metrics.SetEstimateSize(m.shardID, fstree, writecache.StorageTypeFSTree.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *writeCacheMetrics) SetMode(mode mode.Mode) {
|
||||||
|
m.metrics.SetMode(m.shardID, mode.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *writeCacheMetrics) SetActualCounters(db, fstree uint64) {
|
||||||
|
m.metrics.SetActualCount(m.shardID, db, writecache.StorageTypeDB.String())
|
||||||
|
m.metrics.SetActualCount(m.shardID, fstree, writecache.StorageTypeFSTree.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *writeCacheMetrics) Flush(success bool, st writecache.StorageType) {
|
||||||
|
m.metrics.IncFlushCounter(m.shardID, success, st.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *writeCacheMetrics) Evict(st writecache.StorageType) {
|
||||||
|
m.metrics.DecActualCount(m.shardID, st.String())
|
||||||
|
m.metrics.IncEvictCounter(m.shardID, st.String())
|
||||||
|
}
|
||||||
|
|
|
@ -187,6 +187,13 @@ func WithWriteCacheOptions(opts ...writecache.Option) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithExtraWriteCacheOptions returns option to add extra write cache options.
|
||||||
|
func WithExtraWriteCacheOptions(opts ...writecache.Option) Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.writeCacheOpts = append(c.writeCacheOpts, opts...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithPiloramaOptions returns option to set internal write cache options.
|
// WithPiloramaOptions returns option to set internal write cache options.
|
||||||
func WithPiloramaOptions(opts ...pilorama.Option) Option {
|
func WithPiloramaOptions(opts ...pilorama.Option) Option {
|
||||||
return func(c *cfg) {
|
return func(c *cfg) {
|
||||||
|
|
|
@ -2,6 +2,7 @@ package writecache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
|
@ -22,6 +23,13 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
||||||
))
|
))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
deleted := false
|
||||||
|
storageType := StorageTypeUndefined
|
||||||
|
startedAt := time.Now()
|
||||||
|
defer func() {
|
||||||
|
c.metrics.Delete(time.Since(startedAt), deleted, storageType)
|
||||||
|
}()
|
||||||
|
|
||||||
c.modeMtx.RLock()
|
c.modeMtx.RLock()
|
||||||
defer c.modeMtx.RUnlock()
|
defer c.modeMtx.RUnlock()
|
||||||
if c.readOnly() {
|
if c.readOnly() {
|
||||||
|
@ -30,15 +38,15 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
||||||
|
|
||||||
saddr := addr.EncodeToString()
|
saddr := addr.EncodeToString()
|
||||||
|
|
||||||
// Check disk cache.
|
var dataSize int
|
||||||
var has int
|
|
||||||
_ = c.db.View(func(tx *bbolt.Tx) error {
|
_ = c.db.View(func(tx *bbolt.Tx) error {
|
||||||
b := tx.Bucket(defaultBucket)
|
b := tx.Bucket(defaultBucket)
|
||||||
has = len(b.Get([]byte(saddr)))
|
dataSize = len(b.Get([]byte(saddr)))
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
if 0 < has {
|
if dataSize > 0 {
|
||||||
|
storageType = StorageTypeDB
|
||||||
err := c.db.Update(func(tx *bbolt.Tx) error {
|
err := c.db.Update(func(tx *bbolt.Tx) error {
|
||||||
b := tx.Bucket(defaultBucket)
|
b := tx.Bucket(defaultBucket)
|
||||||
err := b.Delete([]byte(saddr))
|
err := b.Delete([]byte(saddr))
|
||||||
|
@ -52,10 +60,12 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
||||||
storagelog.StorageTypeField(wcStorageType),
|
storagelog.StorageTypeField(wcStorageType),
|
||||||
storagelog.OpField("db DELETE"),
|
storagelog.OpField("db DELETE"),
|
||||||
)
|
)
|
||||||
|
deleted = true
|
||||||
c.objCounters.DecDB()
|
c.objCounters.DecDB()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
storageType = StorageTypeFSTree
|
||||||
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
|
_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
storagelog.Write(c.log,
|
storagelog.Write(c.log,
|
||||||
|
@ -64,6 +74,7 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
||||||
storagelog.OpField("fstree DELETE"),
|
storagelog.OpField("fstree DELETE"),
|
||||||
)
|
)
|
||||||
c.objCounters.DecFS()
|
c.objCounters.DecFS()
|
||||||
|
deleted = true
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -199,7 +199,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = c.flushObject(ctx, &obj, data)
|
err = c.flushObject(ctx, &obj, data, StorageTypeFSTree)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if ignoreErrors {
|
if ignoreErrors {
|
||||||
return nil
|
return nil
|
||||||
|
@ -228,7 +228,7 @@ func (c *cache) workerFlushSmall() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err := c.flushObject(context.TODO(), obj, nil)
|
err := c.flushObject(context.TODO(), obj, nil, StorageTypeDB)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Error is handled in flushObject.
|
// Error is handled in flushObject.
|
||||||
continue
|
continue
|
||||||
|
@ -239,7 +239,13 @@ func (c *cache) workerFlushSmall() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// flushObject is used to write object directly to the main storage.
|
// flushObject is used to write object directly to the main storage.
|
||||||
func (c *cache) flushObject(ctx context.Context, obj *object.Object, data []byte) error {
|
func (c *cache) flushObject(ctx context.Context, obj *object.Object, data []byte, st StorageType) error {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
c.metrics.Flush(err == nil, st)
|
||||||
|
}()
|
||||||
|
|
||||||
addr := objectCore.AddressOf(obj)
|
addr := objectCore.AddressOf(obj)
|
||||||
|
|
||||||
var prm common.PutPrm
|
var prm common.PutPrm
|
||||||
|
@ -313,7 +319,7 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.flushObject(ctx, &obj, data); err != nil {
|
if err := c.flushObject(ctx, &obj, data, StorageTypeDB); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ package writecache
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
|
@ -27,9 +28,22 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
|
||||||
))
|
))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
return c.getInternal(ctx, saddr, addr)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address) (*objectSDK.Object, error) {
|
||||||
|
found := false
|
||||||
|
storageType := StorageTypeUndefined
|
||||||
|
startedAt := time.Now()
|
||||||
|
defer func() {
|
||||||
|
c.metrics.Get(time.Since(startedAt), found, storageType)
|
||||||
|
}()
|
||||||
|
|
||||||
value, err := Get(c.db, []byte(saddr))
|
value, err := Get(c.db, []byte(saddr))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
obj := objectSDK.New()
|
obj := objectSDK.New()
|
||||||
|
found = true
|
||||||
|
storageType = StorageTypeDB
|
||||||
return obj, obj.Unmarshal(value)
|
return obj, obj.Unmarshal(value)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -38,6 +52,8 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
|
||||||
return nil, logicerr.Wrap(apistatus.ObjectNotFound{})
|
return nil, logicerr.Wrap(apistatus.ObjectNotFound{})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
found = true
|
||||||
|
storageType = StorageTypeFSTree
|
||||||
return res.Object, nil
|
return res.Object, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -45,13 +61,15 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
|
||||||
//
|
//
|
||||||
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
|
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
|
||||||
func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
|
func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
|
||||||
|
saddr := addr.EncodeToString()
|
||||||
|
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Head",
|
ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Head",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.String("address", addr.EncodeToString()),
|
attribute.String("address", saddr),
|
||||||
))
|
))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
obj, err := c.Get(ctx, addr)
|
obj, err := c.getInternal(ctx, saddr, addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
)
|
)
|
||||||
|
@ -12,59 +11,6 @@ import (
|
||||||
// ErrNoDefaultBucket is returned by IterateDB when default bucket for objects is missing.
|
// ErrNoDefaultBucket is returned by IterateDB when default bucket for objects is missing.
|
||||||
var ErrNoDefaultBucket = errors.New("no default bucket")
|
var ErrNoDefaultBucket = errors.New("no default bucket")
|
||||||
|
|
||||||
// IterationPrm contains iteration parameters.
|
|
||||||
type IterationPrm struct {
|
|
||||||
handler func([]byte) error
|
|
||||||
ignoreErrors bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithHandler sets a callback to be executed on every object.
|
|
||||||
func (p *IterationPrm) WithHandler(f func([]byte) error) {
|
|
||||||
p.handler = f
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithIgnoreErrors sets a flag indicating that errors should be ignored.
|
|
||||||
func (p *IterationPrm) WithIgnoreErrors(ignore bool) {
|
|
||||||
p.ignoreErrors = ignore
|
|
||||||
}
|
|
||||||
|
|
||||||
// Iterate iterates over all objects present in write cache.
|
|
||||||
// This is very difficult to do correctly unless write-cache is put in read-only mode.
|
|
||||||
// Thus we silently fail if shard is not in read-only mode to avoid reporting misleading results.
|
|
||||||
func (c *cache) Iterate(prm IterationPrm) error {
|
|
||||||
c.modeMtx.RLock()
|
|
||||||
defer c.modeMtx.RUnlock()
|
|
||||||
if !c.readOnly() {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
err := c.db.View(func(tx *bbolt.Tx) error {
|
|
||||||
b := tx.Bucket(defaultBucket)
|
|
||||||
return b.ForEach(func(k, data []byte) error {
|
|
||||||
return prm.handler(data)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
var fsPrm common.IteratePrm
|
|
||||||
fsPrm.IgnoreErrors = prm.ignoreErrors
|
|
||||||
fsPrm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error {
|
|
||||||
data, err := f()
|
|
||||||
if err != nil {
|
|
||||||
if prm.ignoreErrors {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return prm.handler(data)
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = c.fsTree.Iterate(fsPrm)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// IterateDB iterates over all objects stored in bbolt.DB instance and passes them to f until error return.
|
// IterateDB iterates over all objects stored in bbolt.DB instance and passes them to f until error return.
|
||||||
// It is assumed that db is an underlying database of some WriteCache instance.
|
// It is assumed that db is an underlying database of some WriteCache instance.
|
||||||
//
|
//
|
||||||
|
|
49
pkg/local_object_storage/writecache/metrics.go
Normal file
49
pkg/local_object_storage/writecache/metrics.go
Normal file
|
@ -0,0 +1,49 @@
|
||||||
|
package writecache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
|
)
|
||||||
|
|
||||||
|
type StorageType string
|
||||||
|
|
||||||
|
func (t StorageType) String() string {
|
||||||
|
return string(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
StorageTypeUndefined StorageType = "null"
|
||||||
|
StorageTypeDB StorageType = "db"
|
||||||
|
StorageTypeFSTree StorageType = "fstree"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Metrics interface {
|
||||||
|
Get(d time.Duration, success bool, st StorageType)
|
||||||
|
Delete(d time.Duration, success bool, st StorageType)
|
||||||
|
Put(d time.Duration, success bool, st StorageType)
|
||||||
|
Flush(success bool, st StorageType)
|
||||||
|
Evict(st StorageType)
|
||||||
|
|
||||||
|
SetEstimateSize(db, fstree uint64)
|
||||||
|
SetMode(m mode.Mode)
|
||||||
|
SetActualCounters(db, fstree uint64)
|
||||||
|
}
|
||||||
|
|
||||||
|
type metricsStub struct{}
|
||||||
|
|
||||||
|
func (s *metricsStub) Get(time.Duration, bool, StorageType) {}
|
||||||
|
|
||||||
|
func (s *metricsStub) Delete(time.Duration, bool, StorageType) {}
|
||||||
|
|
||||||
|
func (s *metricsStub) Put(time.Duration, bool, StorageType) {}
|
||||||
|
|
||||||
|
func (s *metricsStub) SetEstimateSize(uint64, uint64) {}
|
||||||
|
|
||||||
|
func (s *metricsStub) SetMode(mode.Mode) {}
|
||||||
|
|
||||||
|
func (s *metricsStub) SetActualCounters(uint64, uint64) {}
|
||||||
|
|
||||||
|
func (s *metricsStub) Flush(bool, StorageType) {}
|
||||||
|
|
||||||
|
func (s *metricsStub) Evict(StorageType) {}
|
|
@ -29,7 +29,11 @@ func (c *cache) SetMode(m mode.Mode) error {
|
||||||
))
|
))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
return c.setMode(ctx, m)
|
err := c.setMode(ctx, m)
|
||||||
|
if err == nil {
|
||||||
|
c.metrics.SetMode(m)
|
||||||
|
}
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// setMode applies new mode. Must be called with cache.modeMtx lock taken.
|
// setMode applies new mode. Must be called with cache.modeMtx lock taken.
|
||||||
|
|
|
@ -60,6 +60,8 @@ type options struct {
|
||||||
reportError func(string, error)
|
reportError func(string, error)
|
||||||
// openFile is the function called internally by bbolt to open database files. Useful for hermetic testing.
|
// openFile is the function called internally by bbolt to open database files. Useful for hermetic testing.
|
||||||
openFile func(string, int, fs.FileMode) (*os.File, error)
|
openFile func(string, int, fs.FileMode) (*os.File, error)
|
||||||
|
// metrics is metrics implementation
|
||||||
|
metrics Metrics
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithLogger sets logger.
|
// WithLogger sets logger.
|
||||||
|
@ -164,3 +166,10 @@ func WithOpenFile(f func(string, int, fs.FileMode) (*os.File, error)) Option {
|
||||||
o.openFile = f
|
o.openFile = f
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithMetrics sets metrics implementation.
|
||||||
|
func WithMetrics(metrics Metrics) Option {
|
||||||
|
return func(o *options) {
|
||||||
|
o.metrics = metrics
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package writecache
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
|
@ -33,6 +34,13 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
|
||||||
))
|
))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
|
startedAt := time.Now()
|
||||||
|
added := false
|
||||||
|
storageType := StorageTypeUndefined
|
||||||
|
defer func() {
|
||||||
|
c.metrics.Put(time.Since(startedAt), added, storageType)
|
||||||
|
}()
|
||||||
|
|
||||||
c.modeMtx.RLock()
|
c.modeMtx.RLock()
|
||||||
defer c.modeMtx.RUnlock()
|
defer c.modeMtx.RUnlock()
|
||||||
if c.readOnly() {
|
if c.readOnly() {
|
||||||
|
@ -51,9 +59,20 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
|
||||||
}
|
}
|
||||||
|
|
||||||
if sz <= c.smallObjectSize {
|
if sz <= c.smallObjectSize {
|
||||||
return common.PutRes{}, c.putSmall(oi)
|
storageType = StorageTypeDB
|
||||||
|
err := c.putSmall(oi)
|
||||||
|
if err == nil {
|
||||||
|
added = true
|
||||||
|
}
|
||||||
|
return common.PutRes{}, err
|
||||||
}
|
}
|
||||||
return common.PutRes{}, c.putBig(ctx, oi.addr, prm)
|
|
||||||
|
storageType = StorageTypeFSTree
|
||||||
|
err := c.putBig(ctx, oi.addr, prm)
|
||||||
|
if err == nil {
|
||||||
|
added = true
|
||||||
|
}
|
||||||
|
return common.PutRes{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// putSmall persists small objects to the write-cache database and
|
// putSmall persists small objects to the write-cache database and
|
||||||
|
|
|
@ -9,7 +9,10 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func (c *cache) estimateCacheSize() uint64 {
|
func (c *cache) estimateCacheSize() uint64 {
|
||||||
return c.objCounters.DB()*c.smallObjectSize + c.objCounters.FS()*c.maxObjectSize
|
db := c.objCounters.DB() * c.smallObjectSize
|
||||||
|
fstree := c.objCounters.FS() * c.maxObjectSize
|
||||||
|
c.metrics.SetEstimateSize(db, fstree)
|
||||||
|
return db + fstree
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) incSizeDB(sz uint64) uint64 {
|
func (c *cache) incSizeDB(sz uint64) uint64 {
|
||||||
|
@ -68,6 +71,7 @@ func (c *cache) initCounters() error {
|
||||||
|
|
||||||
c.objCounters.cDB.Store(inDB)
|
c.objCounters.cDB.Store(inDB)
|
||||||
c.objCounters.cFS.Store(inFS)
|
c.objCounters.cFS.Store(inFS)
|
||||||
|
c.metrics.SetActualCounters(inDB, inFS)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -79,6 +79,7 @@ func (c *cache) deleteFromDB(keys []string) []string {
|
||||||
})
|
})
|
||||||
for i := 0; i < errorIndex; i++ {
|
for i := 0; i < errorIndex; i++ {
|
||||||
c.objCounters.DecDB()
|
c.objCounters.DecDB()
|
||||||
|
c.metrics.Evict(StorageTypeDB)
|
||||||
storagelog.Write(c.log,
|
storagelog.Write(c.log,
|
||||||
storagelog.AddressField(keys[i]),
|
storagelog.AddressField(keys[i]),
|
||||||
storagelog.StorageTypeField(wcStorageType),
|
storagelog.StorageTypeField(wcStorageType),
|
||||||
|
@ -121,6 +122,7 @@ func (c *cache) deleteFromDisk(ctx context.Context, keys []string) []string {
|
||||||
storagelog.StorageTypeField(wcStorageType),
|
storagelog.StorageTypeField(wcStorageType),
|
||||||
storagelog.OpField("fstree DELETE"),
|
storagelog.OpField("fstree DELETE"),
|
||||||
)
|
)
|
||||||
|
c.metrics.Evict(StorageTypeFSTree)
|
||||||
c.objCounters.DecFS()
|
c.objCounters.DecFS()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,7 +32,6 @@ type Cache interface {
|
||||||
// Returns apistatus.ObjectNotFound if object is missing in the Cache.
|
// Returns apistatus.ObjectNotFound if object is missing in the Cache.
|
||||||
// Returns ErrReadOnly if the Cache is currently in the read-only mode.
|
// Returns ErrReadOnly if the Cache is currently in the read-only mode.
|
||||||
Delete(context.Context, oid.Address) error
|
Delete(context.Context, oid.Address) error
|
||||||
Iterate(IterationPrm) error
|
|
||||||
Put(context.Context, common.PutPrm) (common.PutRes, error)
|
Put(context.Context, common.PutPrm) (common.PutRes, error)
|
||||||
SetMode(mode.Mode) error
|
SetMode(mode.Mode) error
|
||||||
SetLogger(*logger.Logger)
|
SetLogger(*logger.Logger)
|
||||||
|
@ -104,6 +103,7 @@ func New(opts ...Option) Cache {
|
||||||
maxBatchSize: bbolt.DefaultMaxBatchSize,
|
maxBatchSize: bbolt.DefaultMaxBatchSize,
|
||||||
maxBatchDelay: bbolt.DefaultMaxBatchDelay,
|
maxBatchDelay: bbolt.DefaultMaxBatchDelay,
|
||||||
openFile: os.OpenFile,
|
openFile: os.OpenFile,
|
||||||
|
metrics: &metricsStub{},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -141,6 +141,7 @@ func (c *cache) Open(readOnly bool) error {
|
||||||
|
|
||||||
// Init runs necessary services.
|
// Init runs necessary services.
|
||||||
func (c *cache) Init() error {
|
func (c *cache) Init() error {
|
||||||
|
c.metrics.SetMode(c.mode)
|
||||||
c.runFlushLoop()
|
c.runFlushLoop()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,6 +48,18 @@ func newGaugeVec(opts prometheus.GaugeOpts, labelNames []string) metric[*prometh
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newGaugeFunc(opts prometheus.GaugeOpts, f func() float64) metric[prometheus.GaugeFunc] {
|
||||||
|
return metric[prometheus.GaugeFunc]{
|
||||||
|
value: prometheus.NewGaugeFunc(opts, f),
|
||||||
|
desc: Description{
|
||||||
|
Name: prometheus.BuildFQName(opts.Namespace, opts.Subsystem, opts.Name),
|
||||||
|
Type: dto.MetricType_GAUGE.String(),
|
||||||
|
Help: opts.Help,
|
||||||
|
ConstantLabels: opts.ConstLabels,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func newCounter(opts prometheus.CounterOpts) metric[prometheus.Counter] {
|
func newCounter(opts prometheus.CounterOpts) metric[prometheus.Counter] {
|
||||||
return metric[prometheus.Counter]{
|
return metric[prometheus.Counter]{
|
||||||
value: prometheus.NewCounter(opts),
|
value: prometheus.NewCounter(opts),
|
||||||
|
@ -60,6 +72,32 @@ func newCounter(opts prometheus.CounterOpts) metric[prometheus.Counter] {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newCounterVec(opts prometheus.CounterOpts, labels []string) metric[*prometheus.CounterVec] {
|
||||||
|
return metric[*prometheus.CounterVec]{
|
||||||
|
value: prometheus.NewCounterVec(opts, labels),
|
||||||
|
desc: Description{
|
||||||
|
Name: prometheus.BuildFQName(opts.Namespace, opts.Subsystem, opts.Name),
|
||||||
|
Type: dto.MetricType_COUNTER.String(),
|
||||||
|
Help: opts.Help,
|
||||||
|
ConstantLabels: opts.ConstLabels,
|
||||||
|
VariableLabels: labels,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newHistogramVec(opts prometheus.HistogramOpts, labelNames []string) metric[*prometheus.HistogramVec] {
|
||||||
|
return metric[*prometheus.HistogramVec]{
|
||||||
|
value: prometheus.NewHistogramVec(opts, labelNames),
|
||||||
|
desc: Description{
|
||||||
|
Name: prometheus.BuildFQName(opts.Namespace, opts.Subsystem, opts.Name),
|
||||||
|
Type: dto.MetricType_HISTOGRAM.String(),
|
||||||
|
Help: opts.Help,
|
||||||
|
ConstantLabels: opts.ConstLabels,
|
||||||
|
VariableLabels: labelNames,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// DescribeAll returns descriptions for all registered metrics.
|
// DescribeAll returns descriptions for all registered metrics.
|
||||||
func DescribeAll() ([]Description, error) {
|
func DescribeAll() ([]Description, error) {
|
||||||
registeredDescriptionsMtx.Lock()
|
registeredDescriptionsMtx.Lock()
|
||||||
|
|
|
@ -10,6 +10,8 @@ type NodeMetrics struct {
|
||||||
stateMetrics
|
stateMetrics
|
||||||
replicatorMetrics
|
replicatorMetrics
|
||||||
epoch metric[prometheus.Gauge]
|
epoch metric[prometheus.Gauge]
|
||||||
|
|
||||||
|
writeCacheMetrics *writeCacheMetrics
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewNodeMetrics() *NodeMetrics {
|
func NewNodeMetrics() *NodeMetrics {
|
||||||
|
@ -33,12 +35,16 @@ func NewNodeMetrics() *NodeMetrics {
|
||||||
})
|
})
|
||||||
mustRegister(epoch)
|
mustRegister(epoch)
|
||||||
|
|
||||||
|
writeCacheMetrics := newWriteCacheMetrics()
|
||||||
|
writeCacheMetrics.register()
|
||||||
|
|
||||||
return &NodeMetrics{
|
return &NodeMetrics{
|
||||||
objectServiceMetrics: objectService,
|
objectServiceMetrics: objectService,
|
||||||
engineMetrics: engine,
|
engineMetrics: engine,
|
||||||
stateMetrics: state,
|
stateMetrics: state,
|
||||||
replicatorMetrics: replicator,
|
replicatorMetrics: replicator,
|
||||||
epoch: epoch,
|
epoch: epoch,
|
||||||
|
writeCacheMetrics: writeCacheMetrics,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -46,3 +52,11 @@ func NewNodeMetrics() *NodeMetrics {
|
||||||
func (m *NodeMetrics) SetEpoch(epoch uint64) {
|
func (m *NodeMetrics) SetEpoch(epoch uint64) {
|
||||||
m.epoch.value.Set(float64(epoch))
|
m.epoch.value.Set(float64(epoch))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WriteCache returns WriteCache metrics.
|
||||||
|
func (m *NodeMetrics) WriteCache() WriteCacheMetrics {
|
||||||
|
if m == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return m.writeCacheMetrics
|
||||||
|
}
|
||||||
|
|
252
pkg/metrics/writecache.go
Normal file
252
pkg/metrics/writecache.go
Normal file
|
@ -0,0 +1,252 @@
|
||||||
|
package metrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
wcSubsystem = "writecache"
|
||||||
|
wcShardID = "shard_id"
|
||||||
|
wcSuccess = "success"
|
||||||
|
wcStorage = "storage"
|
||||||
|
wcMode = "mode"
|
||||||
|
)
|
||||||
|
|
||||||
|
type shardIDMode struct {
|
||||||
|
shardID, mode string
|
||||||
|
}
|
||||||
|
|
||||||
|
type WriteCacheMetrics interface {
|
||||||
|
AddGetDuration(shardID string, success bool, d time.Duration)
|
||||||
|
IncGetCounter(shardID string, success bool, storageType string)
|
||||||
|
|
||||||
|
AddDeleteDuration(shardID string, success bool, d time.Duration)
|
||||||
|
IncDeleteCounter(shardID string, success bool, storageType string)
|
||||||
|
|
||||||
|
AddPutDuration(shardID string, success bool, d time.Duration)
|
||||||
|
IncPutCounter(shardID string, success bool, storageType string)
|
||||||
|
|
||||||
|
IncActualCount(shardID string, storageType string)
|
||||||
|
DecActualCount(shardID string, storageType string)
|
||||||
|
SetActualCount(shardID string, count uint64, storageType string)
|
||||||
|
|
||||||
|
SetEstimateSize(shardID string, size uint64, storageType string)
|
||||||
|
SetMode(shardID string, mode string)
|
||||||
|
|
||||||
|
IncFlushCounter(shardID string, success bool, storageType string)
|
||||||
|
IncEvictCounter(shardID string, storageType string)
|
||||||
|
}
|
||||||
|
|
||||||
|
type writeCacheMetrics struct {
|
||||||
|
getDuration metric[*prometheus.HistogramVec]
|
||||||
|
getCounter metric[*prometheus.CounterVec]
|
||||||
|
|
||||||
|
putDuration metric[*prometheus.HistogramVec]
|
||||||
|
putCounter metric[*prometheus.CounterVec]
|
||||||
|
|
||||||
|
deleteDuration metric[*prometheus.HistogramVec]
|
||||||
|
deleteCounter metric[*prometheus.CounterVec]
|
||||||
|
|
||||||
|
flushCounter metric[*prometheus.CounterVec]
|
||||||
|
evictCounter metric[*prometheus.CounterVec]
|
||||||
|
|
||||||
|
actualCount metric[*prometheus.GaugeVec]
|
||||||
|
|
||||||
|
estimatedSize metric[*prometheus.GaugeVec]
|
||||||
|
|
||||||
|
modeMetrics map[shardIDMode]metric[prometheus.GaugeFunc]
|
||||||
|
modeValues map[string]string
|
||||||
|
modeMtx sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func newWriteCacheMetrics() *writeCacheMetrics {
|
||||||
|
return &writeCacheMetrics{
|
||||||
|
getDuration: newWCMethodDurationCounter("get"),
|
||||||
|
getCounter: newWCMethodCounterVec("get"),
|
||||||
|
putDuration: newWCMethodDurationCounter("put"),
|
||||||
|
putCounter: newWCMethodCounterVec("put"),
|
||||||
|
deleteDuration: newWCMethodDurationCounter("delete"),
|
||||||
|
deleteCounter: newWCMethodCounterVec("delete"),
|
||||||
|
flushCounter: newWCOperationCounterVec("flush", []string{wcShardID, wcStorage, wcSuccess}),
|
||||||
|
evictCounter: newWCOperationCounterVec("evict", []string{wcShardID, wcStorage}),
|
||||||
|
actualCount: newWCGaugeVec("actual_objects_count", "Actual objects count in writecache", []string{wcShardID, wcStorage}),
|
||||||
|
estimatedSize: newWCGaugeVec("estimated_size_bytes", "Estimated writecache size", []string{wcShardID, wcStorage}),
|
||||||
|
modeMtx: sync.RWMutex{},
|
||||||
|
modeMetrics: make(map[shardIDMode]metric[prometheus.GaugeFunc]),
|
||||||
|
modeValues: make(map[string]string),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *writeCacheMetrics) AddGetDuration(shardID string, success bool, d time.Duration) {
|
||||||
|
setWriteCacheDuration(m.getDuration.value, shardID, success, d)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *writeCacheMetrics) IncGetCounter(shardID string, success bool, storageType string) {
|
||||||
|
incWriteCacheCounter(m.getCounter.value, shardID, success, storageType)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *writeCacheMetrics) AddDeleteDuration(shardID string, success bool, d time.Duration) {
|
||||||
|
setWriteCacheDuration(m.deleteDuration.value, shardID, success, d)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *writeCacheMetrics) IncDeleteCounter(shardID string, success bool, storageType string) {
|
||||||
|
incWriteCacheCounter(m.deleteCounter.value, shardID, success, storageType)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *writeCacheMetrics) AddPutDuration(shardID string, success bool, d time.Duration) {
|
||||||
|
setWriteCacheDuration(m.putDuration.value, shardID, success, d)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *writeCacheMetrics) IncPutCounter(shardID string, success bool, storageType string) {
|
||||||
|
incWriteCacheCounter(m.putCounter.value, shardID, success, storageType)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *writeCacheMetrics) IncActualCount(shardID string, storageType string) {
|
||||||
|
m.actualCount.value.With(prometheus.Labels{
|
||||||
|
wcShardID: shardID,
|
||||||
|
wcStorage: storageType,
|
||||||
|
}).Inc()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *writeCacheMetrics) DecActualCount(shardID string, storageType string) {
|
||||||
|
m.actualCount.value.With(prometheus.Labels{
|
||||||
|
wcShardID: shardID,
|
||||||
|
wcStorage: storageType,
|
||||||
|
}).Dec()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *writeCacheMetrics) SetActualCount(shardID string, count uint64, storageType string) {
|
||||||
|
m.actualCount.value.With(prometheus.Labels{
|
||||||
|
wcShardID: shardID,
|
||||||
|
wcStorage: storageType,
|
||||||
|
}).Set(float64(count))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *writeCacheMetrics) SetEstimateSize(shardID string, size uint64, storageType string) {
|
||||||
|
m.estimatedSize.value.With(prometheus.Labels{
|
||||||
|
wcShardID: shardID,
|
||||||
|
wcStorage: storageType,
|
||||||
|
}).Set(float64(size))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *writeCacheMetrics) SetMode(shardID string, mode string) {
|
||||||
|
m.modeMtx.Lock()
|
||||||
|
defer m.modeMtx.Unlock()
|
||||||
|
|
||||||
|
m.modeValues[shardID] = mode
|
||||||
|
key := shardIDMode{
|
||||||
|
shardID: shardID,
|
||||||
|
mode: mode,
|
||||||
|
}
|
||||||
|
if _, found := m.modeMetrics[key]; found {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
metric := newGaugeFunc(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: wcSubsystem,
|
||||||
|
Name: "writecache_mode",
|
||||||
|
Help: "Writecache mode value",
|
||||||
|
ConstLabels: prometheus.Labels{
|
||||||
|
wcShardID: shardID,
|
||||||
|
wcMode: mode,
|
||||||
|
},
|
||||||
|
}, func() float64 {
|
||||||
|
m.modeMtx.RLock()
|
||||||
|
defer m.modeMtx.RUnlock()
|
||||||
|
|
||||||
|
value := m.modeValues[shardID]
|
||||||
|
if value == mode {
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
})
|
||||||
|
mustRegister(metric)
|
||||||
|
m.modeMetrics[key] = metric
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *writeCacheMetrics) IncFlushCounter(shardID string, success bool, storageType string) {
|
||||||
|
m.flushCounter.value.With(prometheus.Labels{
|
||||||
|
wcShardID: shardID,
|
||||||
|
wcSuccess: fmt.Sprintf("%v", success),
|
||||||
|
wcStorage: storageType,
|
||||||
|
}).Inc()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *writeCacheMetrics) IncEvictCounter(shardID string, storageType string) {
|
||||||
|
m.evictCounter.value.With(prometheus.Labels{
|
||||||
|
wcShardID: shardID,
|
||||||
|
wcStorage: storageType,
|
||||||
|
}).Inc()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *writeCacheMetrics) register() {
|
||||||
|
mustRegister(m.getDuration)
|
||||||
|
mustRegister(m.getCounter)
|
||||||
|
mustRegister(m.putDuration)
|
||||||
|
mustRegister(m.putCounter)
|
||||||
|
mustRegister(m.deleteDuration)
|
||||||
|
mustRegister(m.deleteCounter)
|
||||||
|
mustRegister(m.actualCount)
|
||||||
|
mustRegister(m.estimatedSize)
|
||||||
|
mustRegister(m.flushCounter)
|
||||||
|
mustRegister(m.evictCounter)
|
||||||
|
}
|
||||||
|
|
||||||
|
func setWriteCacheDuration(m *prometheus.HistogramVec, shardID string, success bool, d time.Duration) {
|
||||||
|
m.With(
|
||||||
|
prometheus.Labels{
|
||||||
|
wcShardID: shardID,
|
||||||
|
wcSuccess: fmt.Sprintf("%v", success),
|
||||||
|
},
|
||||||
|
).Observe(float64(d))
|
||||||
|
}
|
||||||
|
|
||||||
|
func incWriteCacheCounter(m *prometheus.CounterVec, shardID string, success bool, storageType string) {
|
||||||
|
m.With(prometheus.Labels{
|
||||||
|
wcShardID: shardID,
|
||||||
|
wcSuccess: fmt.Sprintf("%v", success),
|
||||||
|
wcStorage: storageType,
|
||||||
|
}).Inc()
|
||||||
|
}
|
||||||
|
|
||||||
|
func newWCMethodDurationCounter(method string) metric[*prometheus.HistogramVec] {
|
||||||
|
return newHistogramVec(prometheus.HistogramOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: wcSubsystem,
|
||||||
|
Name: fmt.Sprintf("%s_req_duration_seconds", method),
|
||||||
|
Help: fmt.Sprintf("Accumulated %s request process duration", method),
|
||||||
|
}, []string{wcShardID, wcSuccess})
|
||||||
|
}
|
||||||
|
|
||||||
|
func newWCMethodCounterVec(method string) metric[*prometheus.CounterVec] {
|
||||||
|
return newCounterVec(prometheus.CounterOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: wcSubsystem,
|
||||||
|
Name: fmt.Sprintf("%s_req_count", method),
|
||||||
|
Help: fmt.Sprintf("The number of %s requests processed", method),
|
||||||
|
}, []string{wcShardID, wcSuccess, wcStorage})
|
||||||
|
}
|
||||||
|
|
||||||
|
func newWCOperationCounterVec(operation string, labels []string) metric[*prometheus.CounterVec] {
|
||||||
|
return newCounterVec(prometheus.CounterOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: wcSubsystem,
|
||||||
|
Name: fmt.Sprintf("%s_operation_count", operation),
|
||||||
|
Help: fmt.Sprintf("The number of %s operations processed", operation),
|
||||||
|
}, labels)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newWCGaugeVec(name, help string, labels []string) metric[*prometheus.GaugeVec] {
|
||||||
|
return newGaugeVec(prometheus.GaugeOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: wcSubsystem,
|
||||||
|
Name: name,
|
||||||
|
Help: help,
|
||||||
|
}, labels)
|
||||||
|
}
|
Loading…
Reference in a new issue