From d212d908b5303b56e8b7eabe82560b4a8f58cf4c Mon Sep 17 00:00:00 2001
From: Dmitrii Stepanov <d.stepanov@yadro.com>
Date: Thu, 18 May 2023 17:19:41 +0300
Subject: [PATCH] [#312] wc: Add metrics

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
---
 pkg/local_object_storage/writecache/delete.go | 19 +++++++--
 pkg/local_object_storage/writecache/flush.go  | 14 +++++--
 pkg/local_object_storage/writecache/get.go    | 22 +++++++++-
 .../writecache/metrics.go                     | 42 +++++++++++++++++++
 pkg/local_object_storage/writecache/mode.go   |  6 ++-
 .../writecache/options.go                     |  9 ++++
 pkg/local_object_storage/writecache/put.go    | 23 +++++++++-
 pkg/local_object_storage/writecache/state.go  |  5 ++-
 .../writecache/storage.go                     |  2 +
 .../writecache/writecache.go                  |  2 +
 10 files changed, 130 insertions(+), 14 deletions(-)
 create mode 100644 pkg/local_object_storage/writecache/metrics.go

diff --git a/pkg/local_object_storage/writecache/delete.go b/pkg/local_object_storage/writecache/delete.go
index c1aab9e5a..ed44a8ad8 100644
--- a/pkg/local_object_storage/writecache/delete.go
+++ b/pkg/local_object_storage/writecache/delete.go
@@ -2,6 +2,7 @@ package writecache
 
 import (
 	"context"
+	"time"
 
 	"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
 	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
@@ -22,6 +23,13 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
 		))
 	defer span.End()
 
+	deleted := false
+	storageType := storageTypeUndefined
+	startedAt := time.Now()
+	defer func() {
+		c.metrics.Delete(time.Since(startedAt), deleted, storageType)
+	}()
+
 	c.modeMtx.RLock()
 	defer c.modeMtx.RUnlock()
 	if c.readOnly() {
@@ -30,15 +38,15 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
 
 	saddr := addr.EncodeToString()
 
-	// Check disk cache.
-	var has int
+	var dataSize int
 	_ = c.db.View(func(tx *bbolt.Tx) error {
 		b := tx.Bucket(defaultBucket)
-		has = len(b.Get([]byte(saddr)))
+		dataSize = len(b.Get([]byte(saddr)))
 		return nil
 	})
 
-	if 0 < has {
+	if dataSize > 0 {
+		storageType = storageTypeDB
 		err := c.db.Update(func(tx *bbolt.Tx) error {
 			b := tx.Bucket(defaultBucket)
 			err := b.Delete([]byte(saddr))
@@ -52,10 +60,12 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
 			storagelog.StorageTypeField(wcStorageType),
 			storagelog.OpField("db DELETE"),
 		)
+		deleted = true
 		c.objCounters.DecDB()
 		return nil
 	}
 
+	storageType = storageTypeFSTree
 	_, err := c.fsTree.Delete(ctx, common.DeletePrm{Address: addr})
 	if err == nil {
 		storagelog.Write(c.log,
@@ -64,6 +74,7 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
 			storagelog.OpField("fstree DELETE"),
 		)
 		c.objCounters.DecFS()
+		deleted = true
 	}
 
 	return err
diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go
index c6c8a9465..09c5451ad 100644
--- a/pkg/local_object_storage/writecache/flush.go
+++ b/pkg/local_object_storage/writecache/flush.go
@@ -199,7 +199,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error {
 			return err
 		}
 
-		err = c.flushObject(ctx, &obj, data)
+		err = c.flushObject(ctx, &obj, data, storageTypeFSTree)
 		if err != nil {
 			if ignoreErrors {
 				return nil
@@ -228,7 +228,7 @@ func (c *cache) workerFlushSmall() {
 			return
 		}
 
-		err := c.flushObject(context.TODO(), obj, nil)
+		err := c.flushObject(context.TODO(), obj, nil, storageTypeDB)
 		if err != nil {
 			// Error is handled in flushObject.
 			continue
@@ -239,7 +239,13 @@ func (c *cache) workerFlushSmall() {
 }
 
 // flushObject is used to write object directly to the main storage.
-func (c *cache) flushObject(ctx context.Context, obj *object.Object, data []byte) error {
+func (c *cache) flushObject(ctx context.Context, obj *object.Object, data []byte, st storageType) error {
+	var err error
+
+	defer func() {
+		c.metrics.Flush(err == nil, st)
+	}()
+
 	addr := objectCore.AddressOf(obj)
 
 	var prm common.PutPrm
@@ -313,7 +319,7 @@ func (c *cache) flush(ctx context.Context, ignoreErrors bool) error {
 				return err
 			}
 
-			if err := c.flushObject(ctx, &obj, data); err != nil {
+			if err := c.flushObject(ctx, &obj, data, storageTypeDB); err != nil {
 				return err
 			}
 		}
diff --git a/pkg/local_object_storage/writecache/get.go b/pkg/local_object_storage/writecache/get.go
index 030f9b413..f8eb01091 100644
--- a/pkg/local_object_storage/writecache/get.go
+++ b/pkg/local_object_storage/writecache/get.go
@@ -2,6 +2,7 @@ package writecache
 
 import (
 	"context"
+	"time"
 
 	"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
 	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
@@ -27,9 +28,22 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
 		))
 	defer span.End()
 
+	return c.getInternal(ctx, saddr, addr)
+}
+
+func (c *cache) getInternal(ctx context.Context, saddr string, addr oid.Address) (*objectSDK.Object, error) {
+	found := false
+	storageType := storageTypeUndefined
+	startedAt := time.Now()
+	defer func() {
+		c.metrics.Get(time.Since(startedAt), found, storageType)
+	}()
+
 	value, err := Get(c.db, []byte(saddr))
 	if err == nil {
 		obj := objectSDK.New()
+		found = true
+		storageType = storageTypeDB
 		return obj, obj.Unmarshal(value)
 	}
 
@@ -38,6 +52,8 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
 		return nil, logicerr.Wrap(apistatus.ObjectNotFound{})
 	}
 
+	found = true
+	storageType = storageTypeFSTree
 	return res.Object, nil
 }
 
@@ -45,13 +61,15 @@ func (c *cache) Get(ctx context.Context, addr oid.Address) (*objectSDK.Object, e
 //
 // Returns an error of type apistatus.ObjectNotFound if the requested object is missing in write-cache.
 func (c *cache) Head(ctx context.Context, addr oid.Address) (*objectSDK.Object, error) {
+	saddr := addr.EncodeToString()
+
 	ctx, span := tracing.StartSpanFromContext(ctx, "writecache.Head",
 		trace.WithAttributes(
-			attribute.String("address", addr.EncodeToString()),
+			attribute.String("address", saddr),
 		))
 	defer span.End()
 
-	obj, err := c.Get(ctx, addr)
+	obj, err := c.getInternal(ctx, saddr, addr)
 	if err != nil {
 		return nil, err
 	}
diff --git a/pkg/local_object_storage/writecache/metrics.go b/pkg/local_object_storage/writecache/metrics.go
new file mode 100644
index 000000000..afa6f69f1
--- /dev/null
+++ b/pkg/local_object_storage/writecache/metrics.go
@@ -0,0 +1,42 @@
+package writecache
+
+import (
+	"time"
+
+	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
+)
+
+type storageType string
+
+const (
+	storageTypeUndefined storageType = "null"
+	storageTypeDB        storageType = "db"
+	storageTypeFSTree    storageType = "fstree"
+)
+
+type Metrics interface {
+	Get(d time.Duration, success bool, st storageType)
+	Delete(d time.Duration, success bool, st storageType)
+	Put(d time.Duration, success bool, st storageType)
+	Flush(success bool, st storageType)
+	Evict(st storageType)
+
+	Estimate(db, fstree uint64)
+	SetMode(m mode.Mode)
+}
+
+type metricsStub struct{}
+
+func (s *metricsStub) Get(time.Duration, bool, storageType) {}
+
+func (s *metricsStub) Delete(time.Duration, bool, storageType) {}
+
+func (s *metricsStub) Put(time.Duration, bool, storageType) {}
+
+func (s *metricsStub) Estimate(uint64, uint64) {}
+
+func (s *metricsStub) SetMode(mode.Mode) {}
+
+func (s *metricsStub) Flush(bool, storageType) {}
+
+func (s *metricsStub) Evict(storageType) {}
diff --git a/pkg/local_object_storage/writecache/mode.go b/pkg/local_object_storage/writecache/mode.go
index ca6faff4c..7e9373a42 100644
--- a/pkg/local_object_storage/writecache/mode.go
+++ b/pkg/local_object_storage/writecache/mode.go
@@ -29,7 +29,11 @@ func (c *cache) SetMode(m mode.Mode) error {
 		))
 	defer span.End()
 
-	return c.setMode(ctx, m)
+	err := c.setMode(ctx, m)
+	if err == nil {
+		c.metrics.SetMode(m)
+	}
+	return err
 }
 
 // setMode applies new mode. Must be called with cache.modeMtx lock taken.
diff --git a/pkg/local_object_storage/writecache/options.go b/pkg/local_object_storage/writecache/options.go
index 3434e9355..bea40aa36 100644
--- a/pkg/local_object_storage/writecache/options.go
+++ b/pkg/local_object_storage/writecache/options.go
@@ -60,6 +60,8 @@ type options struct {
 	reportError func(string, error)
 	// openFile is the function called internally by bbolt to open database files. Useful for hermetic testing.
 	openFile func(string, int, fs.FileMode) (*os.File, error)
+	// metrics is metrics implementation
+	metrics Metrics
 }
 
 // WithLogger sets logger.
@@ -164,3 +166,10 @@ func WithOpenFile(f func(string, int, fs.FileMode) (*os.File, error)) Option {
 		o.openFile = f
 	}
 }
+
+// WithMetrics sets metrics implementation.
+func WithMetrics(metrics Metrics) Option {
+	return func(o *options) {
+		o.metrics = metrics
+	}
+}
diff --git a/pkg/local_object_storage/writecache/put.go b/pkg/local_object_storage/writecache/put.go
index 04d818b31..c0001f926 100644
--- a/pkg/local_object_storage/writecache/put.go
+++ b/pkg/local_object_storage/writecache/put.go
@@ -3,6 +3,7 @@ package writecache
 import (
 	"context"
 	"errors"
+	"time"
 
 	"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
 	"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
@@ -33,6 +34,13 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
 		))
 	defer span.End()
 
+	startedAt := time.Now()
+	added := false
+	storageType := storageTypeUndefined
+	defer func() {
+		c.metrics.Put(time.Since(startedAt), added, storageType)
+	}()
+
 	c.modeMtx.RLock()
 	defer c.modeMtx.RUnlock()
 	if c.readOnly() {
@@ -51,9 +59,20 @@ func (c *cache) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, erro
 	}
 
 	if sz <= c.smallObjectSize {
-		return common.PutRes{}, c.putSmall(oi)
+		storageType = storageTypeDB
+		err := c.putSmall(oi)
+		if err == nil {
+			added = true
+		}
+		return common.PutRes{}, err
 	}
-	return common.PutRes{}, c.putBig(ctx, oi.addr, prm)
+
+	storageType = storageTypeFSTree
+	err := c.putBig(ctx, oi.addr, prm)
+	if err == nil {
+		added = true
+	}
+	return common.PutRes{}, err
 }
 
 // putSmall persists small objects to the write-cache database and
diff --git a/pkg/local_object_storage/writecache/state.go b/pkg/local_object_storage/writecache/state.go
index 9c1c562b0..5f3092526 100644
--- a/pkg/local_object_storage/writecache/state.go
+++ b/pkg/local_object_storage/writecache/state.go
@@ -9,7 +9,10 @@ import (
 )
 
 func (c *cache) estimateCacheSize() uint64 {
-	return c.objCounters.DB()*c.smallObjectSize + c.objCounters.FS()*c.maxObjectSize
+	db := c.objCounters.DB() * c.smallObjectSize
+	fstree := c.objCounters.FS() * c.maxObjectSize
+	c.metrics.Estimate(db, fstree)
+	return db + fstree
 }
 
 func (c *cache) incSizeDB(sz uint64) uint64 {
diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go
index c06d16c0b..50c110c5c 100644
--- a/pkg/local_object_storage/writecache/storage.go
+++ b/pkg/local_object_storage/writecache/storage.go
@@ -79,6 +79,7 @@ func (c *cache) deleteFromDB(keys []string) []string {
 	})
 	for i := 0; i < errorIndex; i++ {
 		c.objCounters.DecDB()
+		c.metrics.Evict(storageTypeDB)
 		storagelog.Write(c.log,
 			storagelog.AddressField(keys[i]),
 			storagelog.StorageTypeField(wcStorageType),
@@ -121,6 +122,7 @@ func (c *cache) deleteFromDisk(ctx context.Context, keys []string) []string {
 				storagelog.StorageTypeField(wcStorageType),
 				storagelog.OpField("fstree DELETE"),
 			)
+			c.metrics.Evict(storageTypeFSTree)
 			c.objCounters.DecFS()
 		}
 	}
diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go
index 0edf4b9be..664beff80 100644
--- a/pkg/local_object_storage/writecache/writecache.go
+++ b/pkg/local_object_storage/writecache/writecache.go
@@ -103,6 +103,7 @@ func New(opts ...Option) Cache {
 			maxBatchSize:    bbolt.DefaultMaxBatchSize,
 			maxBatchDelay:   bbolt.DefaultMaxBatchDelay,
 			openFile:        os.OpenFile,
+			metrics:         &metricsStub{},
 		},
 	}
 
@@ -140,6 +141,7 @@ func (c *cache) Open(readOnly bool) error {
 
 // Init runs necessary services.
 func (c *cache) Init() error {
+	c.metrics.SetMode(c.mode)
 	c.runFlushLoop()
 	return nil
 }