[#1596] metrics: Extract gc and writecache metrics from engine
All checks were successful
Tests and linters / Run gofumpt (pull_request) Successful in 29s
DCO action / DCO (pull_request) Successful in 42s
Vulncheck / Vulncheck (pull_request) Successful in 54s
Pre-commit hooks / Pre-commit (pull_request) Successful in 1m22s
Build / Build Components (pull_request) Successful in 1m29s
Tests and linters / Staticcheck (pull_request) Successful in 1m57s
Tests and linters / Lint (pull_request) Successful in 2m45s
Tests and linters / Tests (pull_request) Successful in 3m14s
Tests and linters / Tests with -race (pull_request) Successful in 3m57s
Tests and linters / gopls check (pull_request) Successful in 4m2s

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
Anton Nikiforov 2024-12-02 22:47:26 +03:00
parent ceac1c8709
commit a0b9b86394
16 changed files with 65 additions and 72 deletions

View file

@ -883,7 +883,10 @@ func (c *cfg) engineOpts() []engine.Option {
)
if c.metricsCollector != nil {
opts = append(opts, engine.WithMetrics(c.metricsCollector.Engine()))
opts = append(opts,
engine.WithEngineMetrics(c.metricsCollector.Engine()),
engine.WithWritecacheMetrics(c.metricsCollector.Writecache()),
engine.WithGCMetrics(c.metricsCollector.GC()))
}
return opts

View file

@ -28,9 +28,6 @@ type EngineMetrics interface {
SetRefillPercent(shardID, path string, percent uint32)
SetRefillStatus(shardID, path, status string)
SetEvacuationInProgress(shardID string, value bool)
WriteCache() WriteCacheMetrics
GC() GCMetrics
}
type engineMetrics struct {
@ -182,14 +179,6 @@ func (m *engineMetrics) SetMode(shardID string, mode mode.Mode) {
m.mode.SetMode(shardID, mode.String())
}
func (m *engineMetrics) WriteCache() WriteCacheMetrics {
return m.writeCache
}
func (m *engineMetrics) GC() GCMetrics {
return m.gc
}
func (m *engineMetrics) IncRefillObjectsCount(shardID, path string, size int, success bool) {
m.refillObjCounter.With(
prometheus.Labels{

View file

@ -79,6 +79,14 @@ func (m *NodeMetrics) Engine() EngineMetrics {
return m.engine
}
func (m *NodeMetrics) Writecache() WriteCacheMetrics {
return m.engine.writeCache
}
func (m *NodeMetrics) GC() GCMetrics {
return m.engine.gc
}
func (m *NodeMetrics) State() StateMetrics {
return m.state
}

View file

@ -45,7 +45,7 @@ func (r ListContainersRes) Containers() []cid.ID {
//
// Returns an error if executions are blocked (see BlockExecution).
func (e *StorageEngine) ContainerSize(ctx context.Context, prm ContainerSizePrm) (res ContainerSizeRes, err error) {
defer elapsed("ContainerSize", e.metrics.AddMethodDuration)()
defer elapsed("ContainerSize", e.engineMetrics.AddMethodDuration)()
err = e.execIfNotBlocked(func() error {
res, err = e.containerSize(ctx, prm)
@ -93,7 +93,7 @@ func (e *StorageEngine) containerSize(ctx context.Context, prm ContainerSizePrm)
//
// Returns an error if executions are blocked (see BlockExecution).
func (e *StorageEngine) ListContainers(ctx context.Context, _ ListContainersPrm) (res ListContainersRes, err error) {
defer elapsed("ListContainers", e.metrics.AddMethodDuration)()
defer elapsed("ListContainers", e.engineMetrics.AddMethodDuration)()
err = e.execIfNotBlocked(func() error {
res, err = e.listContainers(ctx)

View file

@ -58,7 +58,7 @@ func (e *StorageEngine) Delete(ctx context.Context, prm DeletePrm) (res DeleteRe
attribute.Bool("force_removal", prm.forceRemoval),
))
defer span.End()
defer elapsed("Delete", e.metrics.AddMethodDuration)()
defer elapsed("Delete", e.engineMetrics.AddMethodDuration)()
err = e.execIfNotBlocked(func() error {
res, err = e.delete(ctx, prm)

View file

@ -145,7 +145,7 @@ func (e *StorageEngine) reportShardError(
}
errCount := sh.errorCount.Add(1)
e.metrics.IncErrorCounter(sh.ID().String())
e.engineMetrics.IncErrorCounter(sh.ID().String())
sid := sh.ID()
e.log.Warn(ctx, msg, append([]zap.Field{
@ -187,7 +187,9 @@ type cfg struct {
errorsThreshold uint32
metrics MetricRegister
engineMetrics Metrics
writeCacheMetrics WriteCacheMetrics
gcMetrics GCMetrics
shardPoolSize uint32
@ -200,7 +202,10 @@ func defaultCfg() *cfg {
res := &cfg{
log: logger.NewLoggerWrapper(zap.L()),
shardPoolSize: 20,
metrics: noopMetrics{},
engineMetrics: noopMetrics{},
writeCacheMetrics: noopWriteCacheMetrics{},
gcMetrics: noopGCMetrics{},
}
res.containerSource.Store(&containerSource{})
return res
@ -231,9 +236,21 @@ func WithLogger(l *logger.Logger) Option {
}
}
func WithMetrics(v MetricRegister) Option {
func WithEngineMetrics(v Metrics) Option {
return func(c *cfg) {
c.metrics = v
c.engineMetrics = v
}
}
func WithWritecacheMetrics(v WriteCacheMetrics) Option {
return func(c *cfg) {
c.writeCacheMetrics = v
}
}
func WithGCMetrics(v GCMetrics) Option {
return func(c *cfg) {
c.gcMetrics = v
}
}

View file

@ -56,7 +56,7 @@ func (e *StorageEngine) Get(ctx context.Context, prm GetPrm) (res GetRes, err er
attribute.String("address", prm.addr.EncodeToString()),
))
defer span.End()
defer elapsed("Get", e.metrics.AddMethodDuration)()
defer elapsed("Get", e.engineMetrics.AddMethodDuration)()
err = e.execIfNotBlocked(func() error {
res, err = e.get(ctx, prm)

View file

@ -68,7 +68,7 @@ func (e *StorageEngine) Head(ctx context.Context, prm HeadPrm) (res HeadRes, err
func (e *StorageEngine) head(ctx context.Context, prm HeadPrm) (HeadRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.head")
defer span.End()
defer elapsed("Head", e.metrics.AddMethodDuration)()
defer elapsed("Head", e.engineMetrics.AddMethodDuration)()
var (
head *objectSDK.Object

View file

@ -70,7 +70,7 @@ var errInhumeFailure = errors.New("inhume operation failed")
func (e *StorageEngine) Inhume(ctx context.Context, prm InhumePrm) (res InhumeRes, err error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.Inhume")
defer span.End()
defer elapsed("Inhume", e.metrics.AddMethodDuration)()
defer elapsed("Inhume", e.engineMetrics.AddMethodDuration)()
err = e.execIfNotBlocked(func() error {
res, err = e.inhume(ctx, prm)
@ -396,7 +396,7 @@ func (e *StorageEngine) processZeroSizeContainers(ctx context.Context, ids []cid
}
for id := range idMap {
e.metrics.DeleteContainerSize(id.EncodeToString())
e.engineMetrics.DeleteContainerSize(id.EncodeToString())
}
}
@ -474,7 +474,7 @@ func (e *StorageEngine) processZeroCountContainers(ctx context.Context, ids []ci
}
for id := range idMap {
e.metrics.DeleteContainerCount(id.EncodeToString())
e.engineMetrics.DeleteContainerCount(id.EncodeToString())
}
}

View file

@ -101,7 +101,7 @@ func (l ListWithCursorRes) Cursor() *Cursor {
func (e *StorageEngine) ListWithCursor(ctx context.Context, prm ListWithCursorPrm) (ListWithCursorRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "StorageEngine.ListWithCursor")
defer span.End()
defer elapsed("ListWithCursor", e.metrics.AddMethodDuration)()
defer elapsed("ListWithCursor", e.engineMetrics.AddMethodDuration)()
result := make([]objectcore.Info, 0, prm.count)

View file

@ -32,7 +32,7 @@ func (e *StorageEngine) Lock(ctx context.Context, idCnr cid.ID, locker oid.ID, l
attribute.Int("locked_count", len(locked)),
))
defer span.End()
defer elapsed("Lock", e.metrics.AddMethodDuration)()
defer elapsed("Lock", e.engineMetrics.AddMethodDuration)()
return e.execIfNotBlocked(func() error {
return e.lock(ctx, idCnr, locker, locked)

View file

@ -7,34 +7,12 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
)
type MetricRegister interface {
AddMethodDuration(method string, d time.Duration)
SetObjectCounter(shardID, objectType string, v uint64)
AddToObjectCounter(shardID, objectType string, delta int)
SetMode(shardID string, mode mode.Mode)
AddToContainerSize(cnrID string, size int64)
DeleteContainerSize(cnrID string)
DeleteContainerCount(cnrID string)
AddToPayloadCounter(shardID string, size int64)
IncErrorCounter(shardID string)
ClearErrorCounter(shardID string)
DeleteShardMetrics(shardID string)
SetContainerObjectCounter(shardID, contID, objectType string, v uint64)
IncContainerObjectCounter(shardID, contID, objectType string)
SubContainerObjectCounter(shardID, contID, objectType string, v uint64)
IncRefillObjectsCount(shardID, path string, size int, success bool)
SetRefillPercent(shardID, path string, percent uint32)
SetRefillStatus(shardID, path, status string)
SetEvacuationInProgress(shardID string, value bool)
WriteCache() metrics.WriteCacheMetrics
GC() metrics.GCMetrics
}
type (
Metrics = metrics.EngineMetrics
GCMetrics = metrics.GCMetrics
WriteCacheMetrics = metrics.WriteCacheMetrics
NullBool = metrics.NullBool
)
func elapsed(method string, addFunc func(method string, d time.Duration)) func() {
t := time.Now()
@ -76,9 +54,9 @@ type (
)
var (
_ MetricRegister = noopMetrics{}
_ metrics.WriteCacheMetrics = noopWriteCacheMetrics{}
_ metrics.GCMetrics = noopGCMetrics{}
_ Metrics = noopMetrics{}
_ WriteCacheMetrics = noopWriteCacheMetrics{}
_ GCMetrics = noopGCMetrics{}
)
func (noopMetrics) AddMethodDuration(string, time.Duration) {}
@ -99,8 +77,6 @@ func (noopMetrics) IncRefillObjectsCount(string, string, int, bool) {}
func (noopMetrics) SetRefillPercent(string, string, uint32) {}
func (noopMetrics) SetRefillStatus(string, string, string) {}
func (noopMetrics) SetEvacuationInProgress(string, bool) {}
func (noopMetrics) WriteCache() metrics.WriteCacheMetrics { return noopWriteCacheMetrics{} }
func (noopMetrics) GC() metrics.GCMetrics { return noopGCMetrics{} }
func (noopWriteCacheMetrics) AddMethodDuration(string, string, string, string, bool, time.Duration) {}
func (noopWriteCacheMetrics) SetActualCount(string, string, string, uint64) {}

View file

@ -56,7 +56,7 @@ func (e *StorageEngine) Put(ctx context.Context, prm PutPrm) (err error) {
attribute.String("address", object.AddressOf(prm.Object).EncodeToString()),
))
defer span.End()
defer elapsed("Put", e.metrics.AddMethodDuration)()
defer elapsed("Put", e.engineMetrics.AddMethodDuration)()
err = e.execIfNotBlocked(func() error {
err = e.put(ctx, prm)

View file

@ -72,7 +72,7 @@ func (e *StorageEngine) GetRange(ctx context.Context, prm RngPrm) (res RngRes, e
attribute.String("length", strconv.FormatUint(prm.ln, 10)),
))
defer span.End()
defer elapsed("GetRange", e.metrics.AddMethodDuration)()
defer elapsed("GetRange", e.engineMetrics.AddMethodDuration)()
err = e.execIfNotBlocked(func() error {
res, err = e.getRange(ctx, prm)

View file

@ -51,7 +51,7 @@ func (e *StorageEngine) Select(ctx context.Context, prm SelectPrm) (res SelectRe
attribute.String("container_id", prm.cnr.EncodeToString()),
))
defer span.End()
defer elapsed("Select", e.metrics.AddMethodDuration)()
defer elapsed("Select", e.engineMetrics.AddMethodDuration)()
err = e.execIfNotBlocked(func() error {
res, err = e._select(ctx, prm)
@ -98,7 +98,7 @@ func (e *StorageEngine) _select(ctx context.Context, prm SelectPrm) (SelectRes,
//
// Returns an error if executions are blocked (see BlockExecution).
func (e *StorageEngine) List(ctx context.Context, limit uint64) (res SelectRes, err error) {
defer elapsed("List", e.metrics.AddMethodDuration)()
defer elapsed("List", e.engineMetrics.AddMethodDuration)()
err = e.execIfNotBlocked(func() error {
res, err = e.list(ctx, limit)
return err

View file

@ -28,7 +28,7 @@ type hashedShard struct {
type metricsWithID struct {
id string
mw MetricRegister
mw Metrics
}
func (m *metricsWithID) SetShardID(id string) {
@ -116,7 +116,7 @@ func (e *StorageEngine) AddShard(ctx context.Context, opts ...shard.Option) (*sh
return nil, fmt.Errorf("add %s shard: %w", sh.ID().String(), err)
}
e.cfg.metrics.SetMode(sh.ID().String(), sh.GetMode())
e.cfg.engineMetrics.SetMode(sh.ID().String(), sh.GetMode())
return sh.ID(), nil
}
@ -154,18 +154,18 @@ func (e *StorageEngine) appendMetrics(id *shard.ID, opts []shard.Option) []shard
shard.WithMetricsWriter(
&metricsWithID{
id: id.String(),
mw: e.metrics,
mw: e.engineMetrics,
},
),
shard.WithWriteCacheMetrics(
&writeCacheMetrics{
shardID: id.String(),
metrics: e.metrics.WriteCache(),
metrics: e.writeCacheMetrics,
},
),
shard.WithGCMetrics(
&gcMetrics{
storage: e.metrics.GC(),
storage: e.gcMetrics,
shardID: id.String(),
},
),
@ -217,7 +217,7 @@ func (e *StorageEngine) removeShards(ctx context.Context, ids ...string) {
continue
}
e.metrics.DeleteShardMetrics(id)
e.engineMetrics.DeleteShardMetrics(id)
ss = append(ss, sh)
delete(e.shards, id)
@ -318,7 +318,7 @@ func (e *StorageEngine) SetShardMode(ctx context.Context, id *shard.ID, m mode.M
if id.String() == shID {
if resetErrorCounter {
sh.errorCount.Store(0)
e.metrics.ClearErrorCounter(shID)
e.engineMetrics.ClearErrorCounter(shID)
}
return sh.SetMode(ctx, m)
}
@ -422,7 +422,7 @@ func (e *StorageEngine) deleteShards(ctx context.Context, ids []*shard.ID) ([]ha
for _, sh := range ss {
idStr := sh.ID().String()
e.metrics.DeleteShardMetrics(idStr)
e.engineMetrics.DeleteShardMetrics(idStr)
delete(e.shards, idStr)