From 4b8b4da6818ecce7700dd432f8a6ed5d8be86106 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 27 Dec 2023 18:23:12 +0300 Subject: [PATCH] [#864] engine: Drop container count metric if container removed Signed-off-by: Dmitrii Stepanov --- internal/logs/logs.go | 3 + pkg/local_object_storage/engine/inhume.go | 82 ++++++++++- pkg/local_object_storage/engine/metrics.go | 1 + pkg/local_object_storage/engine/shards.go | 1 + pkg/local_object_storage/metabase/counter.go | 131 +++++++++++++++++- .../metabase/counter_test.go | 18 +-- pkg/local_object_storage/shard/container.go | 59 ++++++++ pkg/local_object_storage/shard/gc.go | 13 ++ .../shard/metrics_test.go | 6 +- pkg/local_object_storage/shard/shard.go | 29 ++-- pkg/metrics/engine.go | 5 + 11 files changed, 310 insertions(+), 38 deletions(-) diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 3075e66e..420c6612 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -560,8 +560,11 @@ const ( ShardGCCollectingExpiredMetricsStarted = "collecting expired metrics started" ShardGCCollectingExpiredMetricsCompleted = "collecting expired metrics completed" ShardGCFailedToCollectZeroSizeContainers = "failed to collect zero-size containers" + ShardGCFailedToCollectZeroCountContainers = "failed to collect zero-count containers" EngineFailedToCheckContainerAvailability = "failed to check container availability" EngineFailedToGetContainerSize = "failed to get container size" EngineFailedToDeleteContainerSize = "failed to delete container size" EngineInterruptProcessingZeroSizeContainers = "interrupt processing zero-size containers" + EngineInterruptProcessingZeroCountContainers = "interrupt processing zero-count containers" + EngineFailedToGetContainerCounters = "failed to get container counters" ) diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index 6750adec..fc6ee243 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -266,7 +266,7 @@ func (e *StorageEngine) processZeroSizeContainers(ctx context.Context, ids []cid return } - idMap, err := e.selectNonExistedIDs(ctx, ids) + idMap, err := e.selectNonExistentIDs(ctx, ids) if err != nil { return } @@ -339,7 +339,85 @@ func (e *StorageEngine) processZeroSizeContainers(ctx context.Context, ids []cid } } -func (e *StorageEngine) selectNonExistedIDs(ctx context.Context, ids []cid.ID) (map[cid.ID]struct{}, error) { +func (e *StorageEngine) processZeroCountContainers(ctx context.Context, ids []cid.ID) { + if len(ids) == 0 { + return + } + + idMap, err := e.selectNonExistentIDs(ctx, ids) + if err != nil { + return + } + + if len(idMap) == 0 { + return + } + + var failed bool + var prm shard.ContainerCountPrm + e.iterateOverUnsortedShards(func(sh hashedShard) bool { + select { + case <-ctx.Done(): + e.log.Info(logs.EngineInterruptProcessingZeroCountContainers, zap.Error(ctx.Err())) + failed = true + return true + default: + } + + var drop []cid.ID + for id := range idMap { + prm.ContainerID = id + s, err := sh.ContainerCount(ctx, prm) + if err != nil { + e.log.Warn(logs.EngineFailedToGetContainerCounters, zap.Stringer("container_id", id), zap.Error(err)) + failed = true + return true + } + if s.User > 0 || s.Logic > 0 || s.Phy > 0 { + drop = append(drop, id) + } + } + for _, id := range drop { + delete(idMap, id) + } + + return len(idMap) == 0 + }) + + if failed || len(idMap) == 0 { + return + } + + e.iterateOverUnsortedShards(func(sh hashedShard) bool { + select { + case <-ctx.Done(): + e.log.Info(logs.EngineInterruptProcessingZeroCountContainers, zap.Error(ctx.Err())) + failed = true + return true + default: + } + + for id := range idMap { + if err := sh.DeleteContainerSize(ctx, id); err != nil { + e.log.Warn(logs.EngineFailedToDeleteContainerSize, zap.Stringer("container_id", id), zap.Error(err)) + failed = true + return true + } + } + + return false + }) + + if failed { + return + } + + for id := range idMap { + e.metrics.DeleteContainerCount(id.EncodeToString()) + } +} + +func (e *StorageEngine) selectNonExistentIDs(ctx context.Context, ids []cid.ID) (map[cid.ID]struct{}, error) { cs := e.containerSource.Load() idMap := make(map[cid.ID]struct{}) diff --git a/pkg/local_object_storage/engine/metrics.go b/pkg/local_object_storage/engine/metrics.go index e4cc504f..e4117bb1 100644 --- a/pkg/local_object_storage/engine/metrics.go +++ b/pkg/local_object_storage/engine/metrics.go @@ -17,6 +17,7 @@ type MetricRegister interface { AddToContainerSize(cnrID string, size int64) DeleteContainerSize(cnrID string) + DeleteContainerCount(cnrID string) AddToPayloadCounter(shardID string, size int64) IncErrorCounter(shardID string) ClearErrorCounter(shardID string) diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index cb3a4eca..0455471e 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -120,6 +120,7 @@ func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (* shard.WithReportErrorFunc(e.reportShardErrorBackground), shard.WithRebuildWorkerLimiter(e.rebuildLimiter), shard.WithZeroSizeCallback(e.processZeroSizeContainers), + shard.WithZeroCountCallback(e.processZeroCountContainers), )...) if err := sh.UpdateID(ctx); err != nil { diff --git a/pkg/local_object_storage/metabase/counter.go b/pkg/local_object_storage/metabase/counter.go index 271e78fe..915fbd95 100644 --- a/pkg/local_object_storage/metabase/counter.go +++ b/pkg/local_object_storage/metabase/counter.go @@ -120,7 +120,9 @@ func (db *DB) ContainerCounters(ctx context.Context) (ContainerCounters, error) default: } - completed, err := db.containerCountersNextBatch(lastKey, &cc) + completed, err := db.containerCountersNextBatch(lastKey, func(id cid.ID, entity ObjectCounters) { + cc.Counts[id] = entity + }) if err != nil { return cc, err } @@ -133,7 +135,7 @@ func (db *DB) ContainerCounters(ctx context.Context) (ContainerCounters, error) return cc, nil } -func (db *DB) containerCountersNextBatch(lastKey []byte, cc *ContainerCounters) (bool, error) { +func (db *DB) containerCountersNextBatch(lastKey []byte, f func(id cid.ID, entity ObjectCounters)) (bool, error) { db.modeMtx.RLock() defer db.modeMtx.RUnlock() @@ -165,7 +167,7 @@ func (db *DB) containerCountersNextBatch(lastKey []byte, cc *ContainerCounters) if err != nil { return err } - cc.Counts[cnrID] = ent + f(cnrID, ent) counter++ if counter == batchSize { @@ -187,6 +189,43 @@ func (db *DB) containerCountersNextBatch(lastKey []byte, cc *ContainerCounters) return false, nil } +func (db *DB) ContainerCount(ctx context.Context, id cid.ID) (ObjectCounters, error) { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("ContainerCount", time.Since(startedAt), success) + }() + + _, span := tracing.StartSpanFromContext(ctx, "metabase.ContainerCount") + defer span.End() + + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return ObjectCounters{}, ErrDegradedMode + } + + var result ObjectCounters + + err := db.boltDB.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(containerCounterBucketName) + key := make([]byte, cidSize) + id.Encode(key) + v := b.Get(key) + if v == nil { + return nil + } + var err error + result, err = parseContainerCounterValue(v) + return err + }) + + return result, metaerr.Wrap(err) +} + func (db *DB) incCounters(tx *bbolt.Tx, cnrID cid.ID, isUserObject bool) error { if err := db.updateShardObjectCounter(tx, phy, 1, true); err != nil { return fmt.Errorf("could not increase phy object counter: %w", err) @@ -270,9 +309,6 @@ func (*DB) editContainerCounterValue(b *bbolt.Bucket, key []byte, delta ObjectCo entity.Phy = nextValue(entity.Phy, delta.Phy, inc) entity.Logic = nextValue(entity.Logic, delta.Logic, inc) entity.User = nextValue(entity.User, delta.User, inc) - if entity.IsZero() { - return b.Delete(key) - } value := containerCounterValue(entity) return b.Put(key, value) } @@ -610,3 +646,86 @@ func (db *DB) DeleteContainerSize(ctx context.Context, id cid.ID) error { success = err == nil return metaerr.Wrap(err) } + +// ZeroCountContainers returns containers with objects count = 0 in metabase. +func (db *DB) ZeroCountContainers(ctx context.Context) ([]cid.ID, error) { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("ZeroCountContainers", time.Since(startedAt), success) + }() + + ctx, span := tracing.StartSpanFromContext(ctx, "metabase.ZeroCountContainers") + defer span.End() + + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return nil, ErrDegradedMode + } + + var result []cid.ID + + lastKey := make([]byte, cidSize) + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + completed, err := db.containerCountersNextBatch(lastKey, func(id cid.ID, entity ObjectCounters) { + if entity.IsZero() { + result = append(result, id) + } + }) + if err != nil { + return nil, metaerr.Wrap(err) + } + if completed { + break + } + } + success = true + return result, nil +} + +func (db *DB) DeleteContainerCount(ctx context.Context, id cid.ID) error { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("DeleteContainerCount", time.Since(startedAt), success) + }() + + _, span := tracing.StartSpanFromContext(ctx, "metabase.DeleteContainerCount", + trace.WithAttributes( + attribute.Stringer("container_id", id), + )) + defer span.End() + + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return ErrDegradedMode + } + + if db.mode.ReadOnly() { + return ErrReadOnlyMode + } + + err := db.boltDB.Update(func(tx *bbolt.Tx) error { + b := tx.Bucket(containerCounterBucketName) + + key := make([]byte, cidSize) + id.Encode(key) + return b.Delete(key) + }) + success = err == nil + return metaerr.Wrap(err) +} diff --git a/pkg/local_object_storage/metabase/counter_test.go b/pkg/local_object_storage/metabase/counter_test.go index 2f40daaa..4b7b565b 100644 --- a/pkg/local_object_storage/metabase/counter_test.go +++ b/pkg/local_object_storage/metabase/counter_test.go @@ -105,11 +105,7 @@ func TestCounters(t *testing.T) { v.Phy-- v.Logic-- v.User-- - if v.IsZero() { - delete(exp, cnrID) - } else { - exp[cnrID] = v - } + exp[cnrID] = v } cc, err := db.ContainerCounters(context.Background()) @@ -428,11 +424,7 @@ func TestCounters_Expired(t *testing.T) { if v, ok := exp[oo[0].Container()]; ok { v.Phy-- - if v.IsZero() { - delete(exp, oo[0].Container()) - } else { - exp[oo[0].Container()] = v - } + exp[oo[0].Container()] = v } oo = oo[1:] @@ -463,11 +455,7 @@ func TestCounters_Expired(t *testing.T) { v.Phy-- v.Logic-- v.User-- - if v.IsZero() { - delete(exp, oo[0].Container()) - } else { - exp[oo[0].Container()] = v - } + exp[oo[0].Container()] = v } oo = oo[1:] diff --git a/pkg/local_object_storage/shard/container.go b/pkg/local_object_storage/shard/container.go index 85688548..364649b5 100644 --- a/pkg/local_object_storage/shard/container.go +++ b/pkg/local_object_storage/shard/container.go @@ -44,6 +44,43 @@ func (s *Shard) ContainerSize(prm ContainerSizePrm) (ContainerSizeRes, error) { }, nil } +type ContainerCountPrm struct { + ContainerID cid.ID +} + +type ContainerCountRes struct { + Phy uint64 + Logic uint64 + User uint64 +} + +func (s *Shard) ContainerCount(ctx context.Context, prm ContainerCountPrm) (ContainerCountRes, error) { + ctx, span := tracing.StartSpanFromContext(ctx, "Shard.ContainerCount", + trace.WithAttributes( + attribute.String("shard_id", s.ID().String()), + attribute.Stringer("container_id", prm.ContainerID), + )) + defer span.End() + + s.m.RLock() + defer s.m.RUnlock() + + if s.info.Mode.NoMetabase() { + return ContainerCountRes{}, ErrDegradedMode + } + + counters, err := s.metaBase.ContainerCount(ctx, prm.ContainerID) + if err != nil { + return ContainerCountRes{}, fmt.Errorf("could not get container counters: %w", err) + } + + return ContainerCountRes{ + Phy: counters.Phy, + Logic: counters.Logic, + User: counters.User, + }, nil +} + func (s *Shard) DeleteContainerSize(ctx context.Context, id cid.ID) error { ctx, span := tracing.StartSpanFromContext(ctx, "Shard.DeleteContainerSize", trace.WithAttributes( @@ -65,3 +102,25 @@ func (s *Shard) DeleteContainerSize(ctx context.Context, id cid.ID) error { return s.metaBase.DeleteContainerSize(ctx, id) } + +func (s *Shard) DeleteContainerCount(ctx context.Context, id cid.ID) error { + ctx, span := tracing.StartSpanFromContext(ctx, "Shard.DeleteContainerCount", + trace.WithAttributes( + attribute.String("shard_id", s.ID().String()), + attribute.Stringer("container_id", id), + )) + defer span.End() + + s.m.RLock() + defer s.m.RUnlock() + + if s.info.Mode.ReadOnly() { + return ErrReadOnlyMode + } + + if s.info.Mode.NoMetabase() { + return ErrDegradedMode + } + + return s.metaBase.DeleteContainerCount(ctx, id) +} diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index 3017dce1..bfd1f1b1 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -738,6 +738,7 @@ func (s *Shard) collectExpiredMetrics(ctx context.Context, e Event) { defer s.log.Debug(logs.ShardGCCollectingExpiredMetricsCompleted, zap.Uint64("epoch", epoch)) s.collectExpiredContainerSizeMetrics(ctx, epoch) + s.collectExpiredContainerCountMetrics(ctx, epoch) } func (s *Shard) collectExpiredContainerSizeMetrics(ctx context.Context, epoch uint64) { @@ -751,3 +752,15 @@ func (s *Shard) collectExpiredContainerSizeMetrics(ctx context.Context, epoch ui } s.zeroSizeContainersCallback(ctx, ids) } + +func (s *Shard) collectExpiredContainerCountMetrics(ctx context.Context, epoch uint64) { + ids, err := s.metaBase.ZeroCountContainers(ctx) + if err != nil { + s.log.Warn(logs.ShardGCFailedToCollectZeroCountContainers, zap.Uint64("epoch", epoch), zap.Error(err)) + return + } + if len(ids) == 0 { + return + } + s.zeroCountContainersCallback(ctx, ids) +} diff --git a/pkg/local_object_storage/shard/metrics_test.go b/pkg/local_object_storage/shard/metrics_test.go index f59565cb..9c81c747 100644 --- a/pkg/local_object_storage/shard/metrics_test.go +++ b/pkg/local_object_storage/shard/metrics_test.go @@ -336,11 +336,7 @@ func TestCounters(t *testing.T) { v.Logic-- v.Phy-- v.User-- - if v.IsZero() { - delete(expected, cnr) - } else { - expected[cnr] = v - } + expected[cnr] = v } } require.Equal(t, expectedLogicalSizes, mm.containerSizes()) diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index a83445cf..13f7f689 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -53,8 +53,8 @@ type ExpiredObjectsCallback func(context.Context, uint64, []oid.Address) // DeletedLockCallback is a callback handling list of deleted LOCK objects. type DeletedLockCallback func(context.Context, []oid.Address) -// ZeroSizeContainersCallback is a callback hanfling list of zero-size containers. -type ZeroSizeContainersCallback func(context.Context, []cid.ID) +// EmptyContainersCallback is a callback hanfling list of zero-size and zero-count containers. +type EmptyContainersCallback func(context.Context, []cid.ID) // MetricsWriter is an interface that must store shard's metrics. type MetricsWriter interface { @@ -121,7 +121,8 @@ type cfg struct { deletedLockCallBack DeletedLockCallback - zeroSizeContainersCallback ZeroSizeContainersCallback + zeroSizeContainersCallback EmptyContainersCallback + zeroCountContainersCallback EmptyContainersCallback tsSource TombstoneSource @@ -134,12 +135,13 @@ type cfg struct { func defaultCfg() *cfg { return &cfg{ - rmBatchSize: 100, - log: &logger.Logger{Logger: zap.L()}, - gcCfg: defaultGCCfg(), - reportErrorFunc: func(string, string, error) {}, - rebuildLimiter: &noopRebuildLimiter{}, - zeroSizeContainersCallback: func(context.Context, []cid.ID) {}, + rmBatchSize: 100, + log: &logger.Logger{Logger: zap.L()}, + gcCfg: defaultGCCfg(), + reportErrorFunc: func(string, string, error) {}, + rebuildLimiter: &noopRebuildLimiter{}, + zeroSizeContainersCallback: func(context.Context, []cid.ID) {}, + zeroCountContainersCallback: func(context.Context, []cid.ID) {}, } } @@ -370,12 +372,19 @@ func WithRebuildWorkerLimiter(l RebuildWorkerLimiter) Option { } // WithZeroSizeCallback returns option to set zero-size containers callback. -func WithZeroSizeCallback(cb ZeroSizeContainersCallback) Option { +func WithZeroSizeCallback(cb EmptyContainersCallback) Option { return func(c *cfg) { c.zeroSizeContainersCallback = cb } } +// WithZeroCountCallback returns option to set zero-count containers callback. +func WithZeroCountCallback(cb EmptyContainersCallback) Option { + return func(c *cfg) { + c.zeroCountContainersCallback = cb + } +} + func (s *Shard) fillInfo() { s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo() s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo() diff --git a/pkg/metrics/engine.go b/pkg/metrics/engine.go index 80e91204..4cc54247 100644 --- a/pkg/metrics/engine.go +++ b/pkg/metrics/engine.go @@ -12,6 +12,7 @@ type EngineMetrics interface { AddMethodDuration(method string, d time.Duration) AddToContainerSize(cnrID string, size int64) DeleteContainerSize(cnrID string) + DeleteContainerCount(cnrID string) IncErrorCounter(shardID string) ClearErrorCounter(shardID string) DeleteShardMetrics(shardID string) @@ -84,6 +85,10 @@ func (m *engineMetrics) DeleteContainerSize(cnrID string) { m.containerSize.DeletePartialMatch(prometheus.Labels{containerIDLabelKey: cnrID}) } +func (m *engineMetrics) DeleteContainerCount(cnrID string) { + m.contObjCounter.DeletePartialMatch(prometheus.Labels{containerIDLabelKey: cnrID}) +} + func (m *engineMetrics) AddToPayloadCounter(shardID string, size int64) { m.payloadSize.With(prometheus.Labels{shardIDLabel: shardID}).Add(float64(size)) }