From d4a2d0d3e82745aec7b2bd382847edb387461810 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Thu, 18 May 2023 15:38:41 +0300 Subject: [PATCH 1/4] [#312] wc: Delete unused Iterate method Signed-off-by: Dmitrii Stepanov --- .../writecache/iterate.go | 54 ------------------- .../writecache/writecache.go | 1 - 2 files changed, 55 deletions(-) diff --git a/pkg/local_object_storage/writecache/iterate.go b/pkg/local_object_storage/writecache/iterate.go index ebe979520..59ace93bd 100644 --- a/pkg/local_object_storage/writecache/iterate.go +++ b/pkg/local_object_storage/writecache/iterate.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.etcd.io/bbolt" ) @@ -12,59 +11,6 @@ import ( // ErrNoDefaultBucket is returned by IterateDB when default bucket for objects is missing. var ErrNoDefaultBucket = errors.New("no default bucket") -// IterationPrm contains iteration parameters. -type IterationPrm struct { - handler func([]byte) error - ignoreErrors bool -} - -// WithHandler sets a callback to be executed on every object. -func (p *IterationPrm) WithHandler(f func([]byte) error) { - p.handler = f -} - -// WithIgnoreErrors sets a flag indicating that errors should be ignored. -func (p *IterationPrm) WithIgnoreErrors(ignore bool) { - p.ignoreErrors = ignore -} - -// Iterate iterates over all objects present in write cache. -// This is very difficult to do correctly unless write-cache is put in read-only mode. -// Thus we silently fail if shard is not in read-only mode to avoid reporting misleading results. -func (c *cache) Iterate(prm IterationPrm) error { - c.modeMtx.RLock() - defer c.modeMtx.RUnlock() - if !c.readOnly() { - return nil - } - - err := c.db.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - return b.ForEach(func(k, data []byte) error { - return prm.handler(data) - }) - }) - if err != nil { - return err - } - - var fsPrm common.IteratePrm - fsPrm.IgnoreErrors = prm.ignoreErrors - fsPrm.LazyHandler = func(addr oid.Address, f func() ([]byte, error)) error { - data, err := f() - if err != nil { - if prm.ignoreErrors { - return nil - } - return err - } - return prm.handler(data) - } - - _, err = c.fsTree.Iterate(fsPrm) - return err -} - // IterateDB iterates over all objects stored in bbolt.DB instance and passes them to f until error return. // It is assumed that db is an underlying database of some WriteCache instance. // diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go index 83ecf219c..0edf4b9be 100644 --- a/pkg/local_object_storage/writecache/writecache.go +++ b/pkg/local_object_storage/writecache/writecache.go @@ -32,7 +32,6 @@ type Cache interface { // Returns apistatus.ObjectNotFound if object is missing in the Cache. // Returns ErrReadOnly if the Cache is currently in the read-only mode. Delete(context.Context, oid.Address) error - Iterate(IterationPrm) error Put(context.Context, common.PutPrm) (common.PutRes, error) SetMode(mode.Mode) error SetLogger(*logger.Logger) -- 2.45.3 From 47eccc1de15db52d17b1ce50f3efbaee76b386e8 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Thu, 18 May 2023 17:19:41 +0300 Subject: [PATCH 2/4] [#312] wc: Add metrics Signed-off-by: Dmitrii Stepanov --- pkg/local_object_storage/writecache/delete.go | 19 +++++++-- pkg/local_object_storage/writecache/flush.go | 14 +++++-- pkg/local_object_storage/writecache/get.go | 22 +++++++++- .../writecache/metrics.go | 42 +++++++++++++++++++ pkg/local_object_storage/writecache/mode.go | 6 ++- .../writecache/options.go | 9 ++++ pkg/local_object_storage/writecache/put.go | 23 +++++++++- pkg/local_object_storage/writecache/state.go | 5 ++- .../writecache/storage.go | 2 + .../writecache/writecache.go | 2 + 10 files changed, 130 insertions(+), 14 deletions(-) create mode 100644 pkg/local_object_storage/writecache/metrics.go diff --git a/pkg/local_object_storage/writecache/delete.go b/pkg/local_object_storage/writecache/delete.go index c1aab9e5a..ed44a8ad8 100644 --- a/pkg/local_object_storage/writecache/delete.go +++ b/pkg/local_object_storage/writecache/delete.go @@ -2,6 +2,7 @@ package writecache import ( "context" + "time" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" @@ -22,6 +23,13 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error { )) defer span.End() + deleted := false + storageType := storageTypeUndefined + startedAt := time.Now() + defer func() { + c.metrics.Delete(time.Since(startedAt), deleted, storageType) + }() + c.modeMtx.RLock() defer c.modeMtx.RUnlock() if c.readOnly() { @@ -30,15 +38,15 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error { saddr := addr.EncodeToString() - // Check disk cache. - var has int + var dataSize int _ = c.db.View(func(tx *bbolt.Tx) error { b := tx.Bucket(defaultBucket) - has = len(b.Get([]byte(saddr))) + dataSize = len(b.Get([]byte(saddr))) return nil }) - if 0 < has { + if dataSize > 0 { + storageType = storageTypeDB err := c.db.Update(func(tx *bbolt.Tx) error { b := tx.Bucket(defaultBucket) err := b.Delete([]byte(saddr)) @@ -52,10 +60,12 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error { storagelog.StorageTypeField(wcStorageType), storagelog.OpField("db DELETE"), ) + deleted = true c.objCounters.DecDB() return nil } + storageType = storageTypeFSTree _, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr}) if err == nil { storagelog.Write(c.log, @@ -64,6 +74,7 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error { storagelog.OpField("fstree DELETE"), ) c.objCounters.DecFS() + deleted = true } return err diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index c6c8a9465..09c5451ad 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -199,7 +199,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { return err } - err = c.flushObject(ctx, &obj, data) + err = c.flushObject(ctx, &obj, data, storageTypeFSTree) if err != nil { if ignoreErrors { return nil @@ -228,7 +228,7 @@ func (c *cache) workerFlushSmall() { return } - err := c.flushObject(context.TODO(), obj, nil) + err := c.flushObject(context.TODO(), obj, nil, storageTypeDB) if err != nil { // Error is handled in flushObject. continue @@ -239,7 +239,13 @@ func (c *cache) workerFlushSmall() { } // flushObject is used to write object directly to the main storage. -func (c *cache) flushObject(ctx context.Context, obj *object.Object, data []byte) error { +func (c *cache) flushObject(ctx context.Context, obj *object.Object, data []byte, st storageType) error { + var err error + + defer func() { + c.metrics.Flush(err == nil, st) + }() + addr := objectCore.AddressOf(obj) var prm common.PutPrm @@ -313,7 +319,7 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { return err } - if err := c.flushObject(ctx, &obj, data); err != nil { + if err := c.flushObject(ctx, &obj, data, storageTypeDB); err != nil { return err } } diff --git a/pkg/local_object_storage/writecache/get.go b/pkg/local_object_storage/writecache/get.go index 030f9b413..f8eb01091 100644 --- a/pkg/local_object_storage/writecache/get.go +++ b/pkg/local_object_storage/writecache/get.go @@ -2,6 +2,7 @@ package writecache import ( "context" + "time" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" @@ -27,9 +28,22 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e )) defer span.End() + return c.getInternal(ctx, saddr, addr) +} + +func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address) (*objectSDK.Object, error) { + found := false + storageType := storageTypeUndefined + startedAt := time.Now() + defer func() { + c.metrics.Get(time.Since(startedAt), found, storageType) + }() + value, err := Get(c.db, []byte(saddr)) if err == nil { obj := objectSDK.New() + found = true + storageType = storageTypeDB return obj, obj.Unmarshal(value) } @@ -38,6 +52,8 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e return nil, logicerr.Wrap(apistatus.ObjectNotFound{}) } + found = true + storageType = storageTypeFSTree return res.Object, nil } @@ -45,13 +61,15 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e // // Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache. func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) { + saddr := addr.EncodeToString() + ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Head", trace.WithAttributes( - attribute.String("address", addr.EncodeToString()), + attribute.String("address", saddr), )) defer span.End() - obj, err := c.Get(ctx, addr) + obj, err := c.getInternal(ctx, saddr, addr) if err != nil { return nil, err } diff --git a/pkg/local_object_storage/writecache/metrics.go b/pkg/local_object_storage/writecache/metrics.go new file mode 100644 index 000000000..afa6f69f1 --- /dev/null +++ b/pkg/local_object_storage/writecache/metrics.go @@ -0,0 +1,42 @@ +package writecache + +import ( + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" +) + +type storageType string + +const ( + storageTypeUndefined storageType = "null" + storageTypeDB storageType = "db" + storageTypeFSTree storageType = "fstree" +) + +type Metrics interface { + Get(d time.Duration, success bool, st storageType) + Delete(d time.Duration, success bool, st storageType) + Put(d time.Duration, success bool, st storageType) + Flush(success bool, st storageType) + Evict(st storageType) + + Estimate(db, fstree uint64) + SetMode(m mode.Mode) +} + +type metricsStub struct{} + +func (s *metricsStub) Get(time.Duration, bool, storageType) {} + +func (s *metricsStub) Delete(time.Duration, bool, storageType) {} + +func (s *metricsStub) Put(time.Duration, bool, storageType) {} + +func (s *metricsStub) Estimate(uint64, uint64) {} + +func (s *metricsStub) SetMode(mode.Mode) {} + +func (s *metricsStub) Flush(bool, storageType) {} + +func (s *metricsStub) Evict(storageType) {} diff --git a/pkg/local_object_storage/writecache/mode.go b/pkg/local_object_storage/writecache/mode.go index ca6faff4c..7e9373a42 100644 --- a/pkg/local_object_storage/writecache/mode.go +++ b/pkg/local_object_storage/writecache/mode.go @@ -29,7 +29,11 @@ func (c *cache) SetMode(m mode.Mode) error { )) defer span.End() - return c.setMode(ctx, m) + err := c.setMode(ctx, m) + if err == nil { + c.metrics.SetMode(m) + } + return err } // setMode applies new mode. Must be called with cache.modeMtx lock taken. diff --git a/pkg/local_object_storage/writecache/options.go b/pkg/local_object_storage/writecache/options.go index 3434e9355..bea40aa36 100644 --- a/pkg/local_object_storage/writecache/options.go +++ b/pkg/local_object_storage/writecache/options.go @@ -60,6 +60,8 @@ type options struct { reportError func(string, error) // openFile is the function called internally by bbolt to open database files. Useful for hermetic testing. openFile func(string, int, fs.FileMode) (*os.File, error) + // metrics is metrics implementation + metrics Metrics } // WithLogger sets logger. @@ -164,3 +166,10 @@ func WithOpenFile(f func(string, int, fs.FileMode) (*os.File, error)) Option { o.openFile = f } } + +// WithMetrics sets metrics implementation. +func WithMetrics(metrics Metrics) Option { + return func(o *options) { + o.metrics = metrics + } +} diff --git a/pkg/local_object_storage/writecache/put.go b/pkg/local_object_storage/writecache/put.go index 04d818b31..c0001f926 100644 --- a/pkg/local_object_storage/writecache/put.go +++ b/pkg/local_object_storage/writecache/put.go @@ -3,6 +3,7 @@ package writecache import ( "context" "errors" + "time" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" @@ -33,6 +34,13 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro )) defer span.End() + startedAt := time.Now() + added := false + storageType := storageTypeUndefined + defer func() { + c.metrics.Put(time.Since(startedAt), added, storageType) + }() + c.modeMtx.RLock() defer c.modeMtx.RUnlock() if c.readOnly() { @@ -51,9 +59,20 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro } if sz <= c.smallObjectSize { - return common.PutRes{}, c.putSmall(oi) + storageType = storageTypeDB + err := c.putSmall(oi) + if err == nil { + added = true + } + return common.PutRes{}, err } - return common.PutRes{}, c.putBig(ctx, oi.addr, prm) + + storageType = storageTypeFSTree + err := c.putBig(ctx, oi.addr, prm) + if err == nil { + added = true + } + return common.PutRes{}, err } // putSmall persists small objects to the write-cache database and diff --git a/pkg/local_object_storage/writecache/state.go b/pkg/local_object_storage/writecache/state.go index 9c1c562b0..5f3092526 100644 --- a/pkg/local_object_storage/writecache/state.go +++ b/pkg/local_object_storage/writecache/state.go @@ -9,7 +9,10 @@ import ( ) func (c *cache) estimateCacheSize() uint64 { - return c.objCounters.DB()*c.smallObjectSize + c.objCounters.FS()*c.maxObjectSize + db := c.objCounters.DB() * c.smallObjectSize + fstree := c.objCounters.FS() * c.maxObjectSize + c.metrics.Estimate(db, fstree) + return db + fstree } func (c *cache) incSizeDB(sz uint64) uint64 { diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go index c06d16c0b..50c110c5c 100644 --- a/pkg/local_object_storage/writecache/storage.go +++ b/pkg/local_object_storage/writecache/storage.go @@ -79,6 +79,7 @@ func (c *cache) deleteFromDB(keys []string) []string { }) for i := 0; i < errorIndex; i++ { c.objCounters.DecDB() + c.metrics.Evict(storageTypeDB) storagelog.Write(c.log, storagelog.AddressField(keys[i]), storagelog.StorageTypeField(wcStorageType), @@ -121,6 +122,7 @@ func (c *cache) deleteFromDisk(ctx context.Context, keys []string) []string { storagelog.StorageTypeField(wcStorageType), storagelog.OpField("fstree DELETE"), ) + c.metrics.Evict(storageTypeFSTree) c.objCounters.DecFS() } } diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go index 0edf4b9be..664beff80 100644 --- a/pkg/local_object_storage/writecache/writecache.go +++ b/pkg/local_object_storage/writecache/writecache.go @@ -103,6 +103,7 @@ func New(opts ...Option) Cache { maxBatchSize: bbolt.DefaultMaxBatchSize, maxBatchDelay: bbolt.DefaultMaxBatchDelay, openFile: os.OpenFile, + metrics: &metricsStub{}, }, } @@ -140,6 +141,7 @@ func (c *cache) Open(readOnly bool) error { // Init runs necessary services. func (c *cache) Init() error { + c.metrics.SetMode(c.mode) c.runFlushLoop() return nil } -- 2.45.3 From 769526d8dd4052da13247bc3f46f443e91b558cd Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Fri, 19 May 2023 11:17:19 +0300 Subject: [PATCH 3/4] [#312] metrics: Add writecache metrcis Signed-off-by: Dmitrii Stepanov --- pkg/local_object_storage/engine/metrics.go | 4 + pkg/local_object_storage/engine/shards.go | 7 + pkg/local_object_storage/engine/writecache.go | 53 ++++ pkg/local_object_storage/shard/shard.go | 7 + pkg/local_object_storage/writecache/delete.go | 6 +- pkg/local_object_storage/writecache/flush.go | 8 +- pkg/local_object_storage/writecache/get.go | 6 +- .../writecache/metrics.go | 39 +-- pkg/local_object_storage/writecache/put.go | 6 +- pkg/local_object_storage/writecache/state.go | 3 +- .../writecache/storage.go | 4 +- pkg/metrics/desc.go | 38 +++ pkg/metrics/node.go | 14 + pkg/metrics/writecache.go | 252 ++++++++++++++++++ 14 files changed, 415 insertions(+), 32 deletions(-) create mode 100644 pkg/metrics/writecache.go diff --git a/pkg/local_object_storage/engine/metrics.go b/pkg/local_object_storage/engine/metrics.go index 13dcdfe02..7a11888c5 100644 --- a/pkg/local_object_storage/engine/metrics.go +++ b/pkg/local_object_storage/engine/metrics.go @@ -2,6 +2,8 @@ package engine import ( "time" + + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics" ) type MetricRegister interface { @@ -24,6 +26,8 @@ type MetricRegister interface { AddToContainerSize(cnrID string, size int64) AddToPayloadCounter(shardID string, size int64) + + WriteCache() metrics.WriteCacheMetrics } func elapsed(addFunc func(d time.Duration)) func() { diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index e16d2c498..6c49c8312 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -8,6 +8,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/hrw" "github.com/google/uuid" @@ -98,6 +99,12 @@ func (e *StorageEngine) createShard(opts []shard.Option) (*shard.Shard, error) { mw: e.metrics, }, )) + opts = append(opts, shard.WithExtraWriteCacheOptions(writecache.WithMetrics( + &writeCacheMetrics{ + shardID: id.String(), + metrics: e.metrics.WriteCache(), + }, + ))) } e.mtx.RUnlock() diff --git a/pkg/local_object_storage/engine/writecache.go b/pkg/local_object_storage/engine/writecache.go index 4effb2b16..ea2299b8b 100644 --- a/pkg/local_object_storage/engine/writecache.go +++ b/pkg/local_object_storage/engine/writecache.go @@ -2,9 +2,13 @@ package engine import ( "context" + "time" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/metrics" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) @@ -52,3 +56,52 @@ func (e *StorageEngine) FlushWriteCache(ctx context.Context, p FlushWriteCachePr return FlushWriteCacheRes{}, sh.FlushWriteCache(ctx, prm) } + +type writeCacheMetrics struct { + shardID string + metrics metrics.WriteCacheMetrics +} + +func (m *writeCacheMetrics) Get(d time.Duration, success bool, st writecache.StorageType) { + m.metrics.AddGetDuration(m.shardID, success, d) + m.metrics.IncGetCounter(m.shardID, success, st.String()) +} + +func (m *writeCacheMetrics) Delete(d time.Duration, success bool, st writecache.StorageType) { + m.metrics.AddDeleteDuration(m.shardID, success, d) + m.metrics.IncDeleteCounter(m.shardID, success, st.String()) + if success { + m.metrics.DecActualCount(m.shardID, st.String()) + } +} + +func (m *writeCacheMetrics) Put(d time.Duration, success bool, st writecache.StorageType) { + m.metrics.AddPutDuration(m.shardID, success, d) + m.metrics.IncPutCounter(m.shardID, success, st.String()) + if success { + m.metrics.IncActualCount(m.shardID, st.String()) + } +} + +func (m *writeCacheMetrics) SetEstimateSize(db, fstree uint64) { + m.metrics.SetEstimateSize(m.shardID, db, writecache.StorageTypeDB.String()) + m.metrics.SetEstimateSize(m.shardID, fstree, writecache.StorageTypeFSTree.String()) +} + +func (m *writeCacheMetrics) SetMode(mode mode.Mode) { + m.metrics.SetMode(m.shardID, mode.String()) +} + +func (m *writeCacheMetrics) SetActualCounters(db, fstree uint64) { + m.metrics.SetActualCount(m.shardID, db, writecache.StorageTypeDB.String()) + m.metrics.SetActualCount(m.shardID, fstree, writecache.StorageTypeFSTree.String()) +} + +func (m *writeCacheMetrics) Flush(success bool, st writecache.StorageType) { + m.metrics.IncFlushCounter(m.shardID, success, st.String()) +} + +func (m *writeCacheMetrics) Evict(st writecache.StorageType) { + m.metrics.DecActualCount(m.shardID, st.String()) + m.metrics.IncEvictCounter(m.shardID, st.String()) +} diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 44ec54645..e0059105f 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -187,6 +187,13 @@ func WithWriteCacheOptions(opts ...writecache.Option) Option { } } +// WithExtraWriteCacheOptions returns option to add extra write cache options. +func WithExtraWriteCacheOptions(opts ...writecache.Option) Option { + return func(c *cfg) { + c.writeCacheOpts = append(c.writeCacheOpts, opts...) + } +} + // WithPiloramaOptions returns option to set internal write cache options. func WithPiloramaOptions(opts ...pilorama.Option) Option { return func(c *cfg) { diff --git a/pkg/local_object_storage/writecache/delete.go b/pkg/local_object_storage/writecache/delete.go index ed44a8ad8..f5a292ed4 100644 --- a/pkg/local_object_storage/writecache/delete.go +++ b/pkg/local_object_storage/writecache/delete.go @@ -24,7 +24,7 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error { defer span.End() deleted := false - storageType := storageTypeUndefined + storageType := StorageTypeUndefined startedAt := time.Now() defer func() { c.metrics.Delete(time.Since(startedAt), deleted, storageType) @@ -46,7 +46,7 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error { }) if dataSize > 0 { - storageType = storageTypeDB + storageType = StorageTypeDB err := c.db.Update(func(tx *bbolt.Tx) error { b := tx.Bucket(defaultBucket) err := b.Delete([]byte(saddr)) @@ -65,7 +65,7 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error { return nil } - storageType = storageTypeFSTree + storageType = StorageTypeFSTree _, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr}) if err == nil { storagelog.Write(c.log, diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index 09c5451ad..28409f609 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -199,7 +199,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { return err } - err = c.flushObject(ctx, &obj, data, storageTypeFSTree) + err = c.flushObject(ctx, &obj, data, StorageTypeFSTree) if err != nil { if ignoreErrors { return nil @@ -228,7 +228,7 @@ func (c *cache) workerFlushSmall() { return } - err := c.flushObject(context.TODO(), obj, nil, storageTypeDB) + err := c.flushObject(context.TODO(), obj, nil, StorageTypeDB) if err != nil { // Error is handled in flushObject. continue @@ -239,7 +239,7 @@ func (c *cache) workerFlushSmall() { } // flushObject is used to write object directly to the main storage. -func (c *cache) flushObject(ctx context.Context, obj *object.Object, data []byte, st storageType) error { +func (c *cache) flushObject(ctx context.Context, obj *object.Object, data []byte, st StorageType) error { var err error defer func() { @@ -319,7 +319,7 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { return err } - if err := c.flushObject(ctx, &obj, data, storageTypeDB); err != nil { + if err := c.flushObject(ctx, &obj, data, StorageTypeDB); err != nil { return err } } diff --git a/pkg/local_object_storage/writecache/get.go b/pkg/local_object_storage/writecache/get.go index f8eb01091..f8f6de9b0 100644 --- a/pkg/local_object_storage/writecache/get.go +++ b/pkg/local_object_storage/writecache/get.go @@ -33,7 +33,7 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address) (*objectSDK.Object, error) { found := false - storageType := storageTypeUndefined + storageType := StorageTypeUndefined startedAt := time.Now() defer func() { c.metrics.Get(time.Since(startedAt), found, storageType) @@ -43,7 +43,7 @@ func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address) if err == nil { obj := objectSDK.New() found = true - storageType = storageTypeDB + storageType = StorageTypeDB return obj, obj.Unmarshal(value) } @@ -53,7 +53,7 @@ func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address) } found = true - storageType = storageTypeFSTree + storageType = StorageTypeFSTree return res.Object, nil } diff --git a/pkg/local_object_storage/writecache/metrics.go b/pkg/local_object_storage/writecache/metrics.go index afa6f69f1..e20c7e65e 100644 --- a/pkg/local_object_storage/writecache/metrics.go +++ b/pkg/local_object_storage/writecache/metrics.go @@ -6,37 +6,44 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" ) -type storageType string +type StorageType string + +func (t StorageType) String() string { + return string(t) +} const ( - storageTypeUndefined storageType = "null" - storageTypeDB storageType = "db" - storageTypeFSTree storageType = "fstree" + StorageTypeUndefined StorageType = "null" + StorageTypeDB StorageType = "db" + StorageTypeFSTree StorageType = "fstree" ) type Metrics interface { - Get(d time.Duration, success bool, st storageType) - Delete(d time.Duration, success bool, st storageType) - Put(d time.Duration, success bool, st storageType) - Flush(success bool, st storageType) - Evict(st storageType) + Get(d time.Duration, success bool, st StorageType) + Delete(d time.Duration, success bool, st StorageType) + Put(d time.Duration, success bool, st StorageType) + Flush(success bool, st StorageType) + Evict(st StorageType) - Estimate(db, fstree uint64) + SetEstimateSize(db, fstree uint64) SetMode(m mode.Mode) + SetActualCounters(db, fstree uint64) } type metricsStub struct{} -func (s *metricsStub) Get(time.Duration, bool, storageType) {} +func (s *metricsStub) Get(time.Duration, bool, StorageType) {} -func (s *metricsStub) Delete(time.Duration, bool, storageType) {} +func (s *metricsStub) Delete(time.Duration, bool, StorageType) {} -func (s *metricsStub) Put(time.Duration, bool, storageType) {} +func (s *metricsStub) Put(time.Duration, bool, StorageType) {} -func (s *metricsStub) Estimate(uint64, uint64) {} +func (s *metricsStub) SetEstimateSize(uint64, uint64) {} func (s *metricsStub) SetMode(mode.Mode) {} -func (s *metricsStub) Flush(bool, storageType) {} +func (s *metricsStub) SetActualCounters(uint64, uint64) {} -func (s *metricsStub) Evict(storageType) {} +func (s *metricsStub) Flush(bool, StorageType) {} + +func (s *metricsStub) Evict(StorageType) {} diff --git a/pkg/local_object_storage/writecache/put.go b/pkg/local_object_storage/writecache/put.go index c0001f926..1e99e4a28 100644 --- a/pkg/local_object_storage/writecache/put.go +++ b/pkg/local_object_storage/writecache/put.go @@ -36,7 +36,7 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro startedAt := time.Now() added := false - storageType := storageTypeUndefined + storageType := StorageTypeUndefined defer func() { c.metrics.Put(time.Since(startedAt), added, storageType) }() @@ -59,7 +59,7 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro } if sz <= c.smallObjectSize { - storageType = storageTypeDB + storageType = StorageTypeDB err := c.putSmall(oi) if err == nil { added = true @@ -67,7 +67,7 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro return common.PutRes{}, err } - storageType = storageTypeFSTree + storageType = StorageTypeFSTree err := c.putBig(ctx, oi.addr, prm) if err == nil { added = true diff --git a/pkg/local_object_storage/writecache/state.go b/pkg/local_object_storage/writecache/state.go index 5f3092526..14103e626 100644 --- a/pkg/local_object_storage/writecache/state.go +++ b/pkg/local_object_storage/writecache/state.go @@ -11,7 +11,7 @@ import ( func (c *cache) estimateCacheSize() uint64 { db := c.objCounters.DB() * c.smallObjectSize fstree := c.objCounters.FS() * c.maxObjectSize - c.metrics.Estimate(db, fstree) + c.metrics.SetEstimateSize(db, fstree) return db + fstree } @@ -71,6 +71,7 @@ func (c *cache) initCounters() error { c.objCounters.cDB.Store(inDB) c.objCounters.cFS.Store(inFS) + c.metrics.SetActualCounters(inDB, inFS) return nil } diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go index 50c110c5c..3bd3813d1 100644 --- a/pkg/local_object_storage/writecache/storage.go +++ b/pkg/local_object_storage/writecache/storage.go @@ -79,7 +79,7 @@ func (c *cache) deleteFromDB(keys []string) []string { }) for i := 0; i < errorIndex; i++ { c.objCounters.DecDB() - c.metrics.Evict(storageTypeDB) + c.metrics.Evict(StorageTypeDB) storagelog.Write(c.log, storagelog.AddressField(keys[i]), storagelog.StorageTypeField(wcStorageType), @@ -122,7 +122,7 @@ func (c *cache) deleteFromDisk(ctx context.Context, keys []string) []string { storagelog.StorageTypeField(wcStorageType), storagelog.OpField("fstree DELETE"), ) - c.metrics.Evict(storageTypeFSTree) + c.metrics.Evict(StorageTypeFSTree) c.objCounters.DecFS() } } diff --git a/pkg/metrics/desc.go b/pkg/metrics/desc.go index 74d2d4e6e..612435b2f 100644 --- a/pkg/metrics/desc.go +++ b/pkg/metrics/desc.go @@ -48,6 +48,18 @@ func newGaugeVec(opts prometheus.GaugeOpts, labelNames []string) metric[*prometh } } +func newGaugeFunc(opts prometheus.GaugeOpts, f func() float64) metric[prometheus.GaugeFunc] { + return metric[prometheus.GaugeFunc]{ + value: prometheus.NewGaugeFunc(opts, f), + desc: Description{ + Name: prometheus.BuildFQName(opts.Namespace, opts.Subsystem, opts.Name), + Type: dto.MetricType_GAUGE.String(), + Help: opts.Help, + ConstantLabels: opts.ConstLabels, + }, + } +} + func newCounter(opts prometheus.CounterOpts) metric[prometheus.Counter] { return metric[prometheus.Counter]{ value: prometheus.NewCounter(opts), @@ -60,6 +72,32 @@ func newCounter(opts prometheus.CounterOpts) metric[prometheus.Counter] { } } +func newCounterVec(opts prometheus.CounterOpts, labels []string) metric[*prometheus.CounterVec] { + return metric[*prometheus.CounterVec]{ + value: prometheus.NewCounterVec(opts, labels), + desc: Description{ + Name: prometheus.BuildFQName(opts.Namespace, opts.Subsystem, opts.Name), + Type: dto.MetricType_COUNTER.String(), + Help: opts.Help, + ConstantLabels: opts.ConstLabels, + VariableLabels: labels, + }, + } +} + +func newHistogramVec(opts prometheus.HistogramOpts, labelNames []string) metric[*prometheus.HistogramVec] { + return metric[*prometheus.HistogramVec]{ + value: prometheus.NewHistogramVec(opts, labelNames), + desc: Description{ + Name: prometheus.BuildFQName(opts.Namespace, opts.Subsystem, opts.Name), + Type: dto.MetricType_HISTOGRAM.String(), + Help: opts.Help, + ConstantLabels: opts.ConstLabels, + VariableLabels: labelNames, + }, + } +} + // DescribeAll returns descriptions for all registered metrics. func DescribeAll() ([]Description, error) { registeredDescriptionsMtx.Lock() diff --git a/pkg/metrics/node.go b/pkg/metrics/node.go index bf12e610f..b8041eec8 100644 --- a/pkg/metrics/node.go +++ b/pkg/metrics/node.go @@ -10,6 +10,8 @@ type NodeMetrics struct { stateMetrics replicatorMetrics epoch metric[prometheus.Gauge] + + writeCacheMetrics *writeCacheMetrics } func NewNodeMetrics() *NodeMetrics { @@ -33,12 +35,16 @@ func NewNodeMetrics() *NodeMetrics { }) mustRegister(epoch) + writeCacheMetrics := newWriteCacheMetrics() + writeCacheMetrics.register() + return &NodeMetrics{ objectServiceMetrics: objectService, engineMetrics: engine, stateMetrics: state, replicatorMetrics: replicator, epoch: epoch, + writeCacheMetrics: writeCacheMetrics, } } @@ -46,3 +52,11 @@ func NewNodeMetrics() *NodeMetrics { func (m *NodeMetrics) SetEpoch(epoch uint64) { m.epoch.value.Set(float64(epoch)) } + +// WriteCache returns WriteCache metrics. +func (m *NodeMetrics) WriteCache() WriteCacheMetrics { + if m == nil { + return nil + } + return m.writeCacheMetrics +} diff --git a/pkg/metrics/writecache.go b/pkg/metrics/writecache.go new file mode 100644 index 000000000..890903546 --- /dev/null +++ b/pkg/metrics/writecache.go @@ -0,0 +1,252 @@ +package metrics + +import ( + "fmt" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +const ( + wcSubsystem = "writecache" + wcShardID = "shard_id" + wcSuccess = "success" + wcStorage = "storage" + wcMode = "mode" +) + +type shardIDMode struct { + shardID, mode string +} + +type WriteCacheMetrics interface { + AddGetDuration(shardID string, success bool, d time.Duration) + IncGetCounter(shardID string, success bool, storageType string) + + AddDeleteDuration(shardID string, success bool, d time.Duration) + IncDeleteCounter(shardID string, success bool, storageType string) + + AddPutDuration(shardID string, success bool, d time.Duration) + IncPutCounter(shardID string, success bool, storageType string) + + IncActualCount(shardID string, storageType string) + DecActualCount(shardID string, storageType string) + SetActualCount(shardID string, count uint64, storageType string) + + SetEstimateSize(shardID string, size uint64, storageType string) + SetMode(shardID string, mode string) + + IncFlushCounter(shardID string, success bool, storageType string) + IncEvictCounter(shardID string, storageType string) +} + +type writeCacheMetrics struct { + getDuration metric[*prometheus.HistogramVec] + getCounter metric[*prometheus.CounterVec] + + putDuration metric[*prometheus.HistogramVec] + putCounter metric[*prometheus.CounterVec] + + deleteDuration metric[*prometheus.HistogramVec] + deleteCounter metric[*prometheus.CounterVec] + + flushCounter metric[*prometheus.CounterVec] + evictCounter metric[*prometheus.CounterVec] + + actualCount metric[*prometheus.GaugeVec] + + estimatedSize metric[*prometheus.GaugeVec] + + modeMetrics map[shardIDMode]metric[prometheus.GaugeFunc] + modeValues map[string]string + modeMtx sync.RWMutex +} + +func newWriteCacheMetrics() *writeCacheMetrics { + return &writeCacheMetrics{ + getDuration: newWCMethodDurationCounter("get"), + getCounter: newWCMethodCounterVec("get"), + putDuration: newWCMethodDurationCounter("put"), + putCounter: newWCMethodCounterVec("put"), + deleteDuration: newWCMethodDurationCounter("delete"), + deleteCounter: newWCMethodCounterVec("delete"), + flushCounter: newWCOperationCounterVec("flush", []string{wcShardID, wcStorage, wcSuccess}), + evictCounter: newWCOperationCounterVec("evict", []string{wcShardID, wcStorage}), + actualCount: newWCGaugeVec("actual_objects_count", "Actual objects count in writecache", []string{wcShardID, wcStorage}), + estimatedSize: newWCGaugeVec("estimated_size_bytes", "Estimated writecache size", []string{wcShardID, wcStorage}), + modeMtx: sync.RWMutex{}, + modeMetrics: make(map[shardIDMode]metric[prometheus.GaugeFunc]), + modeValues: make(map[string]string), + } +} + +func (m *writeCacheMetrics) AddGetDuration(shardID string, success bool, d time.Duration) { + setWriteCacheDuration(m.getDuration.value, shardID, success, d) +} + +func (m *writeCacheMetrics) IncGetCounter(shardID string, success bool, storageType string) { + incWriteCacheCounter(m.getCounter.value, shardID, success, storageType) +} + +func (m *writeCacheMetrics) AddDeleteDuration(shardID string, success bool, d time.Duration) { + setWriteCacheDuration(m.deleteDuration.value, shardID, success, d) +} + +func (m *writeCacheMetrics) IncDeleteCounter(shardID string, success bool, storageType string) { + incWriteCacheCounter(m.deleteCounter.value, shardID, success, storageType) +} + +func (m *writeCacheMetrics) AddPutDuration(shardID string, success bool, d time.Duration) { + setWriteCacheDuration(m.putDuration.value, shardID, success, d) +} + +func (m *writeCacheMetrics) IncPutCounter(shardID string, success bool, storageType string) { + incWriteCacheCounter(m.putCounter.value, shardID, success, storageType) +} + +func (m *writeCacheMetrics) IncActualCount(shardID string, storageType string) { + m.actualCount.value.With(prometheus.Labels{ + wcShardID: shardID, + wcStorage: storageType, + }).Inc() +} + +func (m *writeCacheMetrics) DecActualCount(shardID string, storageType string) { + m.actualCount.value.With(prometheus.Labels{ + wcShardID: shardID, + wcStorage: storageType, + }).Dec() +} + +func (m *writeCacheMetrics) SetActualCount(shardID string, count uint64, storageType string) { + m.actualCount.value.With(prometheus.Labels{ + wcShardID: shardID, + wcStorage: storageType, + }).Set(float64(count)) +} + +func (m *writeCacheMetrics) SetEstimateSize(shardID string, size uint64, storageType string) { + m.estimatedSize.value.With(prometheus.Labels{ + wcShardID: shardID, + wcStorage: storageType, + }).Set(float64(size)) +} + +func (m *writeCacheMetrics) SetMode(shardID string, mode string) { + m.modeMtx.Lock() + defer m.modeMtx.Unlock() + + m.modeValues[shardID] = mode + key := shardIDMode{ + shardID: shardID, + mode: mode, + } + if _, found := m.modeMetrics[key]; found { + return + } + + metric := newGaugeFunc( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: wcSubsystem, + Name: "writecache_mode", + Help: "Writecache mode value", + ConstLabels: prometheus.Labels{ + wcShardID: shardID, + wcMode: mode, + }, + }, func() float64 { + m.modeMtx.RLock() + defer m.modeMtx.RUnlock() + + value := m.modeValues[shardID] + if value == mode { + return 1 + } + return 0 + }) + mustRegister(metric) + m.modeMetrics[key] = metric +} + +func (m *writeCacheMetrics) IncFlushCounter(shardID string, success bool, storageType string) { + m.flushCounter.value.With(prometheus.Labels{ + wcShardID: shardID, + wcSuccess: fmt.Sprintf("%v", success), + wcStorage: storageType, + }).Inc() +} + +func (m *writeCacheMetrics) IncEvictCounter(shardID string, storageType string) { + m.evictCounter.value.With(prometheus.Labels{ + wcShardID: shardID, + wcStorage: storageType, + }).Inc() +} + +func (m *writeCacheMetrics) register() { + mustRegister(m.getDuration) + mustRegister(m.getCounter) + mustRegister(m.putDuration) + mustRegister(m.putCounter) + mustRegister(m.deleteDuration) + mustRegister(m.deleteCounter) + mustRegister(m.actualCount) + mustRegister(m.estimatedSize) + mustRegister(m.flushCounter) + mustRegister(m.evictCounter) +} + +func setWriteCacheDuration(m *prometheus.HistogramVec, shardID string, success bool, d time.Duration) { + m.With( + prometheus.Labels{ + wcShardID: shardID, + wcSuccess: fmt.Sprintf("%v", success), + }, + ).Observe(float64(d)) +} + +func incWriteCacheCounter(m *prometheus.CounterVec, shardID string, success bool, storageType string) { + m.With(prometheus.Labels{ + wcShardID: shardID, + wcSuccess: fmt.Sprintf("%v", success), + wcStorage: storageType, + }).Inc() +} + +func newWCMethodDurationCounter(method string) metric[*prometheus.HistogramVec] { + return newHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: wcSubsystem, + Name: fmt.Sprintf("%s_req_duration_seconds", method), + Help: fmt.Sprintf("Accumulated %s request process duration", method), + }, []string{wcShardID, wcSuccess}) +} + +func newWCMethodCounterVec(method string) metric[*prometheus.CounterVec] { + return newCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: wcSubsystem, + Name: fmt.Sprintf("%s_req_count", method), + Help: fmt.Sprintf("The number of %s requests processed", method), + }, []string{wcShardID, wcSuccess, wcStorage}) +} + +func newWCOperationCounterVec(operation string, labels []string) metric[*prometheus.CounterVec] { + return newCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: wcSubsystem, + Name: fmt.Sprintf("%s_operation_count", operation), + Help: fmt.Sprintf("The number of %s operations processed", operation), + }, labels) +} + +func newWCGaugeVec(name, help string, labels []string) metric[*prometheus.GaugeVec] { + return newGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: wcSubsystem, + Name: name, + Help: help, + }, labels) +} -- 2.45.3 From c81d44a06ddf952e2633794dba3f5aa3a10e9d18 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Mon, 22 May 2023 10:10:25 +0300 Subject: [PATCH 4/4] [#312] node: Add WC metrics info to CHANGELOG Signed-off-by: Dmitrii Stepanov --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f79940f76..b5924b781 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ Changelog for FrostFS Node - Reload pprof and metrics on SIGHUP for ir (#125) - Support copies number parameter in `frostfs-cli object put` (#351) - Set extra wallets on SIGHUP for ir (#125) +- Writecache metrics (#312) ### Changed - `frostfs-cli util locode generate` is now much faster (#309) -- 2.45.3