From 3220c4df9fb513d5f6cd3b15e056cc9ea5e8f49e Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Mon, 29 May 2023 17:32:13 +0300 Subject: [PATCH] [#376] metrics: Add GC metrics Signed-off-by: Dmitrii Stepanov --- pkg/local_object_storage/engine/metrics.go | 22 +++++ pkg/local_object_storage/engine/shards.go | 21 +++-- pkg/local_object_storage/shard/delete.go | 12 ++- pkg/local_object_storage/shard/gc.go | 95 ++++++++++++++++--- pkg/local_object_storage/shard/shard.go | 7 ++ pkg/metrics/gc.go | 104 +++++++++++++++++++++ pkg/metrics/node.go | 9 ++ 7 files changed, 244 insertions(+), 26 deletions(-) create mode 100644 pkg/metrics/gc.go diff --git a/pkg/local_object_storage/engine/metrics.go b/pkg/local_object_storage/engine/metrics.go index 7a11888c..1be888ea 100644 --- a/pkg/local_object_storage/engine/metrics.go +++ b/pkg/local_object_storage/engine/metrics.go @@ -28,6 +28,7 @@ type MetricRegister interface { AddToPayloadCounter(shardID string, size int64) WriteCache() metrics.WriteCacheMetrics + GC() metrics.GCMetrics } func elapsed(addFunc func(d time.Duration)) func() { @@ -37,3 +38,24 @@ func elapsed(addFunc func(d time.Duration)) func() { addFunc(time.Since(t)) } } + +type gcMetrics struct { + storage metrics.GCMetrics + shardID string +} + +func (m *gcMetrics) AddRunDuration(d time.Duration, success bool) { + m.storage.AddRunDuration(m.shardID, d, success) +} + +func (m *gcMetrics) AddDeletedCount(deleted, failed uint64) { + m.storage.AddDeletedCount(m.shardID, deleted, failed) +} + +func (m *gcMetrics) AddExpiredObjectCollectionDuration(d time.Duration, success bool, objectType string) { + m.storage.AddExpiredObjectCollectionDuration(m.shardID, d, success, objectType) +} + +func (m *gcMetrics) AddInhumedObjectCount(count uint64, objectType string) { + m.storage.AddInhumedObjectCount(m.shardID, count, objectType) +} diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 6c49c831..07d22d3f 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -98,13 +98,20 @@ func (e *StorageEngine) createShard(opts []shard.Option) (*shard.Shard, error) { id: id.String(), mw: e.metrics, }, - )) - opts = append(opts, shard.WithExtraWriteCacheOptions(writecache.WithMetrics( - &writeCacheMetrics{ - shardID: id.String(), - metrics: e.metrics.WriteCache(), - }, - ))) + ), + shard.WithExtraWriteCacheOptions(writecache.WithMetrics( + &writeCacheMetrics{ + shardID: id.String(), + metrics: e.metrics.WriteCache(), + }), + ), + shard.WithGCMetrics( + &gcMetrics{ + storage: e.metrics.GC(), + shardID: id.String(), + }, + ), + ) } e.mtx.RUnlock() diff --git a/pkg/local_object_storage/shard/delete.go b/pkg/local_object_storage/shard/delete.go index 4eb7ad6a..4843314d 100644 --- a/pkg/local_object_storage/shard/delete.go +++ b/pkg/local_object_storage/shard/delete.go @@ -21,7 +21,9 @@ type DeletePrm struct { } // DeleteRes groups the resulting values of Delete operation. -type DeleteRes struct{} +type DeleteRes struct { + deleted uint64 +} // SetAddresses is a Delete option to set the addresses of the objects to delete. // @@ -53,10 +55,11 @@ func (s *Shard) delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) { return DeleteRes{}, ErrDegradedMode } + result := DeleteRes{} for _, addr := range prm.addr { select { case <-ctx.Done(): - return DeleteRes{}, ctx.Err() + return result, ctx.Err() default: } @@ -65,11 +68,12 @@ func (s *Shard) delete(ctx context.Context, prm DeletePrm) (DeleteRes, error) { s.deleteFromBlobstorSafe(ctx, addr) if err := s.deleteFromMetabase(ctx, addr); err != nil { - return DeleteRes{}, err // stop on metabase error ? + return result, err // stop on metabase error ? } + result.deleted++ } - return DeleteRes{}, nil + return result, nil } func (s *Shard) deleteObjectFromWriteCacheSafe(ctx context.Context, addr oid.Address) { diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index 34a48d44..2580173c 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -67,6 +67,32 @@ type eventHandlers struct { handlers []eventHandler } +type gcRunResult struct { + success bool + deleted uint64 + failedToDelete uint64 +} + +const ( + objectTypeLock = "lock" + objectTypeTombstone = "tombstone" + objectTypeRegular = "regular" +) + +type GCMectrics interface { + AddRunDuration(d time.Duration, success bool) + AddDeletedCount(deleted, failed uint64) + AddExpiredObjectCollectionDuration(d time.Duration, success bool, objectType string) + AddInhumedObjectCount(count uint64, objectType string) +} + +type noopGCMetrics struct{} + +func (m *noopGCMetrics) AddRunDuration(time.Duration, bool) {} +func (m *noopGCMetrics) AddDeletedCount(uint64, uint64) {} +func (m *noopGCMetrics) AddExpiredObjectCollectionDuration(time.Duration, bool, string) {} +func (m *noopGCMetrics) AddInhumedObjectCount(uint64, string) {} + type gc struct { *gcCfg @@ -76,7 +102,7 @@ type gc struct { workerPool util.WorkerPool - remover func(context.Context) + remover func(context.Context) gcRunResult eventChan chan Event mEventHandler map[eventType]*eventHandlers @@ -91,6 +117,8 @@ type gcCfg struct { expiredCollectorWorkersCount int expiredCollectorBatchSize int + + metrics GCMectrics } func defaultGCCfg() gcCfg { @@ -100,6 +128,7 @@ func defaultGCCfg() gcCfg { workerPoolInit: func(int) util.WorkerPool { return nil }, + metrics: &noopGCMetrics{}, } } @@ -178,8 +207,13 @@ func (gc *gc) tickRemover(ctx context.Context) { gc.log.Debug(logs.ShardGCIsStopped) return case <-timer.C: - gc.remover(ctx) + startedAt := time.Now() + + result := gc.remover(ctx) timer.Reset(gc.removerInterval) + + gc.metrics.AddRunDuration(time.Since(startedAt), result.success) + gc.metrics.AddDeletedCount(result.deleted, result.failedToDelete) } } } @@ -196,7 +230,7 @@ func (gc *gc) stop() { // iterates over metabase and deletes objects // with GC-marked graves. // Does nothing if shard is in "read-only" mode. -func (s *Shard) removeGarbage(pctx context.Context) { +func (s *Shard) removeGarbage(pctx context.Context) (result gcRunResult) { ctx, cancel := context.WithCancel(pctx) defer cancel() @@ -244,6 +278,7 @@ func (s *Shard) removeGarbage(pctx context.Context) { return } else if len(buf) == 0 { + result.success = true return } @@ -251,14 +286,20 @@ func (s *Shard) removeGarbage(pctx context.Context) { deletePrm.SetAddresses(buf...) // delete accumulated objects - _, err = s.delete(ctx, deletePrm) + res, err := s.delete(ctx, deletePrm) + + result.deleted = res.deleted + result.failedToDelete = uint64(len(buf)) - res.deleted + result.success = true + if err != nil { s.log.Warn(logs.ShardCouldNotDeleteTheObjects, zap.String("error", err.Error()), ) - - return + result.success = false } + + return } func (s *Shard) getExpiredObjectsParameters() (workersCount, batchSize int) { @@ -276,6 +317,13 @@ func (s *Shard) getExpiredObjectsParameters() (workersCount, batchSize int) { } func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) { + var err error + startedAt := time.Now() + + defer func() { + s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeRegular) + }() + s.log.Debug(logs.ShardGCCollectingExpiredObjectsStarted, zap.Uint64("epoch", e.(newEpoch).epoch)) defer s.log.Debug(logs.ShardGCCollectingExpiredObjectsCompleted, zap.Uint64("epoch", e.(newEpoch).epoch)) @@ -286,7 +334,7 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) { errGroup.Go(func() error { batch := make([]oid.Address, 0, batchSize) - err := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) { + expErr := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) { if o.Type() != object.TypeTombstone && o.Type() != object.TypeLock { batch = append(batch, o.Address()) @@ -300,8 +348,8 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) { } } }) - if err != nil { - return err + if expErr != nil { + return expErr } if len(batch) > 0 { @@ -315,7 +363,7 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) { return nil }) - if err := errGroup.Wait(); err != nil { + if err = errGroup.Wait(); err != nil { s.log.Warn(logs.ShardIteratorOverExpiredObjectsFailed, zap.String("error", err.Error())) } } @@ -355,6 +403,7 @@ func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address) return } + s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeRegular) s.decObjectCounterBy(logical, res.AvailableInhumed()) i := 0 @@ -380,6 +429,13 @@ func (s *Shard) getExpiredWithLinked(source []oid.Address) ([]oid.Address, error } func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) { + var err error + startedAt := time.Now() + + defer func() { + s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeTombstone) + }() + epoch := e.(newEpoch).epoch log := s.log.With(zap.Uint64("epoch", epoch)) @@ -413,7 +469,7 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) { return } - err := s.metaBase.IterateOverGraveyard(iterPrm) + err = s.metaBase.IterateOverGraveyard(iterPrm) if err != nil { log.Error(logs.ShardIteratorOverGraveyardFailed, zap.Error(err)) s.m.RUnlock() @@ -444,6 +500,13 @@ func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) { } func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) { + var err error + startedAt := time.Now() + + defer func() { + s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeLock) + }() + s.log.Debug(logs.ShardGCCollectingExpiredLocksStarted, zap.Uint64("epoch", e.(newEpoch).epoch)) defer s.log.Debug(logs.ShardGCCollectingExpiredLocksCompleted, zap.Uint64("epoch", e.(newEpoch).epoch)) @@ -455,7 +518,7 @@ func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) { errGroup.Go(func() error { batch := make([]oid.Address, 0, batchSize) - err := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) { + expErr := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) { if o.Type() == object.TypeLock { batch = append(batch, o.Address()) @@ -469,8 +532,8 @@ func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) { } } }) - if err != nil { - return err + if expErr != nil { + return expErr } if len(batch) > 0 { @@ -484,7 +547,7 @@ func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) { return nil }) - if err := errGroup.Wait(); err != nil { + if err = errGroup.Wait(); err != nil { s.log.Warn(logs.ShardIteratorOverExpiredLocksFailed, zap.String("error", err.Error())) } } @@ -553,6 +616,7 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston return } + s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeTombstone) s.decObjectCounterBy(logical, res.AvailableInhumed()) i := 0 @@ -598,6 +662,7 @@ func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers [] return } + s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeLock) s.decObjectCounterBy(logical, res.AvailableInhumed()) i := 0 diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 65cc1ef5..2123bca1 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -309,6 +309,13 @@ func WithMetricsWriter(v MetricsWriter) Option { } } +// WithGCMetrics returns option to specify storage of the GC metrics. +func WithGCMetrics(v GCMectrics) Option { + return func(c *cfg) { + c.gcCfg.metrics = v + } +} + // WithReportErrorFunc returns option to specify callback for handling storage-related errors // in the background workers. func WithReportErrorFunc(f func(selfID string, message string, err error)) Option { diff --git a/pkg/metrics/gc.go b/pkg/metrics/gc.go new file mode 100644 index 00000000..c4d5ecb5 --- /dev/null +++ b/pkg/metrics/gc.go @@ -0,0 +1,104 @@ +package metrics + +import ( + "fmt" + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +const ( + gcSubsystem = "garbage_collector" + gcShardID = "shard_id" + gcSuccess = "success" + gcStatus = "status" + gcDeleted = "deleted" + gcFailed = "failed_to_delete" + gcObjectType = "object_type" +) + +type GCMetrics interface { + AddRunDuration(shardID string, d time.Duration, success bool) + AddDeletedCount(shardID string, deleted, failed uint64) + AddExpiredObjectCollectionDuration(shardID string, d time.Duration, success bool, objectType string) + AddInhumedObjectCount(shardID string, count uint64, objectType string) +} + +type gcMetrics struct { + runDuration metric[*prometheus.CounterVec] + deletedCounter metric[*prometheus.CounterVec] + expCollectDuration metric[*prometheus.CounterVec] + inhumedCounter metric[*prometheus.CounterVec] +} + +func (m *gcMetrics) register() { + mustRegister(m.runDuration) + mustRegister(m.deletedCounter) + mustRegister(m.expCollectDuration) + mustRegister(m.inhumedCounter) +} + +func newGCMetrics() *gcMetrics { + return &gcMetrics{ + runDuration: newCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: gcSubsystem, + Name: "delete_duration_seconds", + Help: "The total time of GC runs to delete objects from disk", + }, []string{gcShardID, gcSuccess}), + deletedCounter: newCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: gcSubsystem, + Name: "deleted_objects_count", + Help: "Total count of objects GC deleted or failed to delete from disk", + }, []string{gcShardID, gcStatus}), + expCollectDuration: newCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: gcSubsystem, + Name: "marking_duration_seconds", + Help: "The total time of GC runs to mark expired objects as removed", + }, []string{gcShardID, gcSuccess, gcObjectType}), + inhumedCounter: newCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: gcSubsystem, + Name: "marked_for_removal_objects_count", + Help: "Total count of expired objects GC marked to remove", + }, []string{gcShardID, gcObjectType}), + } +} + +func (m *gcMetrics) AddRunDuration(shardID string, d time.Duration, success bool) { + m.runDuration.value.With(prometheus.Labels{ + gcShardID: shardID, + gcSuccess: fmt.Sprintf("%v", success), + }).Add(d.Seconds()) +} + +func (m *gcMetrics) AddDeletedCount(shardID string, deleted, failed uint64) { + m.deletedCounter.value.With( + prometheus.Labels{ + gcShardID: shardID, + gcStatus: gcDeleted, + }).Add(float64(deleted)) + m.deletedCounter.value.With( + prometheus.Labels{ + gcShardID: shardID, + gcStatus: gcFailed, + }).Add(float64(failed)) +} + +func (m *gcMetrics) AddExpiredObjectCollectionDuration(shardID string, d time.Duration, success bool, objectType string) { + m.expCollectDuration.value.With(prometheus.Labels{ + gcShardID: shardID, + gcSuccess: fmt.Sprintf("%v", success), + gcObjectType: objectType, + }).Add(d.Seconds()) +} + +func (m *gcMetrics) AddInhumedObjectCount(shardID string, count uint64, objectType string) { + m.inhumedCounter.value.With( + prometheus.Labels{ + gcShardID: shardID, + gcObjectType: objectType, + }).Add(float64(count)) +} diff --git a/pkg/metrics/node.go b/pkg/metrics/node.go index cca82b5f..526e460c 100644 --- a/pkg/metrics/node.go +++ b/pkg/metrics/node.go @@ -16,6 +16,7 @@ type NodeMetrics struct { writeCacheMetrics *writeCacheMetrics treeService *treeServiceMetrics epoch metric[prometheus.Gauge] + gc *gcMetrics } func NewNodeMetrics() *NodeMetrics { @@ -45,6 +46,9 @@ func NewNodeMetrics() *NodeMetrics { writeCacheMetrics := newWriteCacheMetrics() writeCacheMetrics.register() + gc := newGCMetrics() + gc.register() + return &NodeMetrics{ objectServiceMetrics: objectService, engineMetrics: engine, @@ -53,6 +57,7 @@ func NewNodeMetrics() *NodeMetrics { treeService: treeService, epoch: epoch, writeCacheMetrics: writeCacheMetrics, + gc: gc, } } @@ -72,3 +77,7 @@ func (m *NodeMetrics) WriteCache() WriteCacheMetrics { func (m *NodeMetrics) TreeService() tree.MetricsRegister { return m.treeService } + +func (m *NodeMetrics) GC() GCMetrics { + return m.gc +}