diff --git a/cmd/frostfs-node/container.go b/cmd/frostfs-node/container.go index 2b9bdd71b..898a4eef9 100644 --- a/cmd/frostfs-node/container.go +++ b/cmd/frostfs-node/container.go @@ -45,6 +45,8 @@ func initContainerService(_ context.Context, c *cfg) { c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) { containerGRPC.RegisterContainerServiceServer(s, server) }) + + c.cfgObject.cfgLocalStorage.localStorage.SetContainerSource(cnrRdr) } func configureEACLAndContainerSources(c *cfg, client *cntClient.Client, cnrSrc containerCore.Source) (*morphContainerReader, *morphContainerWriter) { diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 6ead83700..3075e66e9 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -557,4 +557,11 @@ const ( BlobovniczaSavingCountersToMetaFailed = "saving counters to blobovnicza's meta failed" ObjectRemovalFailureExistsInWritecache = "can't remove object: object must be flushed from writecache" FailedToReportStatusToSystemd = "failed to report status to systemd" + ShardGCCollectingExpiredMetricsStarted = "collecting expired metrics started" + ShardGCCollectingExpiredMetricsCompleted = "collecting expired metrics completed" + ShardGCFailedToCollectZeroSizeContainers = "failed to collect zero-size containers" + EngineFailedToCheckContainerAvailability = "failed to check container availability" + EngineFailedToGetContainerSize = "failed to get container size" + EngineFailedToDeleteContainerSize = "failed to delete container size" + EngineInterruptProcessingZeroSizeContainers = "interrupt processing zero-size containers" ) diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index 026a7c336..e03a08abc 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -7,12 +7,14 @@ import ( "sync/atomic" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "go.uber.org/zap" ) @@ -218,14 +220,18 @@ type cfg struct { lowMem bool rebuildWorkersCount uint32 + + containerSource atomic.Pointer[containerSource] } func defaultCfg() *cfg { - return &cfg{ + res := &cfg{ log: &logger.Logger{Logger: zap.L()}, shardPoolSize: 20, rebuildWorkersCount: 100, } + res.containerSource.Store(&containerSource{}) + return res } // New creates, initializes and returns new StorageEngine instance. @@ -288,3 +294,30 @@ func WithRebuildWorkersCount(count uint32) Option { c.rebuildWorkersCount = count } } + +// SetContainerSource sets container source. +func (e *StorageEngine) SetContainerSource(cs container.Source) { + e.containerSource.Store(&containerSource{cs: cs}) +} + +type containerSource struct { + cs container.Source +} + +func (s *containerSource) IsContainerAvailable(ctx context.Context, id cid.ID) (bool, error) { + select { + case <-ctx.Done(): + return false, ctx.Err() + default: + } + + if s == nil || s.cs == nil { + return true, nil + } + + wasRemoved, err := container.WasRemoved(s.cs, id) + if err != nil { + return false, err + } + return !wasRemoved, nil +} diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index 293746f70..6750adec2 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -11,6 +11,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.opentelemetry.io/otel/attribute" @@ -259,3 +260,99 @@ func (e *StorageEngine) processDeletedLocks(ctx context.Context, lockers []oid.A } }) } + +func (e *StorageEngine) processZeroSizeContainers(ctx context.Context, ids []cid.ID) { + if len(ids) == 0 { + return + } + + idMap, err := e.selectNonExistedIDs(ctx, ids) + if err != nil { + return + } + + if len(idMap) == 0 { + return + } + + var failed bool + var prm shard.ContainerSizePrm + e.iterateOverUnsortedShards(func(sh hashedShard) bool { + select { + case <-ctx.Done(): + e.log.Info(logs.EngineInterruptProcessingZeroSizeContainers, zap.Error(ctx.Err())) + failed = true + return true + default: + } + + var drop []cid.ID + for id := range idMap { + prm.SetContainerID(id) + s, err := sh.ContainerSize(prm) + if err != nil { + e.log.Warn(logs.EngineFailedToGetContainerSize, zap.Stringer("container_id", id), zap.Error(err)) + failed = true + return true + } + if s.Size() > 0 { + drop = append(drop, id) + } + } + for _, id := range drop { + delete(idMap, id) + } + + return len(idMap) == 0 + }) + + if failed || len(idMap) == 0 { + return + } + + e.iterateOverUnsortedShards(func(sh hashedShard) bool { + select { + case <-ctx.Done(): + e.log.Info(logs.EngineInterruptProcessingZeroSizeContainers, zap.Error(ctx.Err())) + failed = true + return true + default: + } + + for id := range idMap { + if err := sh.DeleteContainerSize(ctx, id); err != nil { + e.log.Warn(logs.EngineFailedToDeleteContainerSize, zap.Stringer("container_id", id), zap.Error(err)) + failed = true + return true + } + } + + return false + }) + + if failed { + return + } + + for id := range idMap { + e.metrics.DeleteContainerSize(id.EncodeToString()) + } +} + +func (e *StorageEngine) selectNonExistedIDs(ctx context.Context, ids []cid.ID) (map[cid.ID]struct{}, error) { + cs := e.containerSource.Load() + + idMap := make(map[cid.ID]struct{}) + for _, id := range ids { + isAvailable, err := cs.IsContainerAvailable(ctx, id) + if err != nil { + e.log.Warn(logs.EngineFailedToCheckContainerAvailability, zap.Stringer("container_id", id), zap.Error(err)) + return nil, err + } + if isAvailable { + continue + } + idMap[id] = struct{}{} + } + return idMap, nil +} diff --git a/pkg/local_object_storage/engine/metrics.go b/pkg/local_object_storage/engine/metrics.go index 363d098f6..e4cc504fa 100644 --- a/pkg/local_object_storage/engine/metrics.go +++ b/pkg/local_object_storage/engine/metrics.go @@ -16,6 +16,7 @@ type MetricRegister interface { SetMode(shardID string, mode mode.Mode) AddToContainerSize(cnrID string, size int64) + DeleteContainerSize(cnrID string) AddToPayloadCounter(shardID string, size int64) IncErrorCounter(shardID string) ClearErrorCounter(shardID string) diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 7e3e85e81..cb3a4eca5 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -119,6 +119,7 @@ func (e *StorageEngine) createShard(ctx context.Context, opts []shard.Option) (* shard.WithDeletedLockCallback(e.processDeletedLocks), shard.WithReportErrorFunc(e.reportShardErrorBackground), shard.WithRebuildWorkerLimiter(e.rebuildLimiter), + shard.WithZeroSizeCallback(e.processZeroSizeContainers), )...) if err := sh.UpdateID(ctx); err != nil { diff --git a/pkg/local_object_storage/metabase/counter.go b/pkg/local_object_storage/metabase/counter.go index 581df7a59..271e78fe4 100644 --- a/pkg/local_object_storage/metabase/counter.go +++ b/pkg/local_object_storage/metabase/counter.go @@ -14,6 +14,8 @@ import ( objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.etcd.io/bbolt" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) var ( @@ -480,3 +482,131 @@ func IsUserObject(obj *objectSDK.Object) bool { (obj.SplitID() == nil || (hasParentID && len(obj.Children()) == 0)) } + +// ZeroSizeContainers returns containers with size = 0. +func (db *DB) ZeroSizeContainers(ctx context.Context) ([]cid.ID, error) { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("ZeroSizeContainers", time.Since(startedAt), success) + }() + + ctx, span := tracing.StartSpanFromContext(ctx, "metabase.ZeroSizeContainers") + defer span.End() + + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + var result []cid.ID + lastKey := make([]byte, cidSize) + + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + completed, err := db.containerSizesNextBatch(lastKey, func(contID cid.ID, size uint64) { + if size == 0 { + result = append(result, contID) + } + }) + if err != nil { + return nil, err + } + if completed { + break + } + } + + success = true + return result, nil +} + +func (db *DB) containerSizesNextBatch(lastKey []byte, f func(cid.ID, uint64)) (bool, error) { + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return false, ErrDegradedMode + } + + counter := 0 + const batchSize = 1000 + + err := db.boltDB.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(containerVolumeBucketName) + c := b.Cursor() + var key, value []byte + for key, value = c.Seek(lastKey); key != nil; key, value = c.Next() { + if bytes.Equal(lastKey, key) { + continue + } + copy(lastKey, key) + + size := parseContainerSize(value) + var id cid.ID + if err := id.Decode(key); err != nil { + return err + } + f(id, size) + + counter++ + if counter == batchSize { + break + } + } + + if counter < batchSize { + return ErrInterruptIterator + } + return nil + }) + if err != nil { + if errors.Is(err, ErrInterruptIterator) { + return true, nil + } + return false, metaerr.Wrap(err) + } + return false, nil +} + +func (db *DB) DeleteContainerSize(ctx context.Context, id cid.ID) error { + var ( + startedAt = time.Now() + success = false + ) + defer func() { + db.metrics.AddMethodDuration("DeleteContainerSize", time.Since(startedAt), success) + }() + + _, span := tracing.StartSpanFromContext(ctx, "metabase.DeleteContainerSize", + trace.WithAttributes( + attribute.Stringer("container_id", id), + )) + defer span.End() + + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return ErrDegradedMode + } + + if db.mode.ReadOnly() { + return ErrReadOnlyMode + } + + err := db.boltDB.Update(func(tx *bbolt.Tx) error { + b := tx.Bucket(containerVolumeBucketName) + + key := make([]byte, cidSize) + id.Encode(key) + return b.Delete(key) + }) + success = err == nil + return metaerr.Wrap(err) +} diff --git a/pkg/local_object_storage/shard/container.go b/pkg/local_object_storage/shard/container.go index 24090e8d8..856885486 100644 --- a/pkg/local_object_storage/shard/container.go +++ b/pkg/local_object_storage/shard/container.go @@ -1,9 +1,13 @@ package shard import ( + "context" "fmt" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) type ContainerSizePrm struct { @@ -39,3 +43,25 @@ func (s *Shard) ContainerSize(prm ContainerSizePrm) (ContainerSizeRes, error) { size: size, }, nil } + +func (s *Shard) DeleteContainerSize(ctx context.Context, id cid.ID) error { + ctx, span := tracing.StartSpanFromContext(ctx, "Shard.DeleteContainerSize", + trace.WithAttributes( + attribute.String("shard_id", s.ID().String()), + attribute.Stringer("container_id", id), + )) + defer span.End() + + s.m.RLock() + defer s.m.RUnlock() + + if s.info.Mode.ReadOnly() { + return ErrReadOnlyMode + } + + if s.info.Mode.NoMetabase() { + return ErrDegradedMode + } + + return s.metaBase.DeleteContainerSize(ctx, id) +} diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index f103ebc2b..574bc1430 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -155,6 +155,7 @@ func (s *Shard) Init(ctx context.Context) error { s.collectExpiredLocks, s.collectExpiredObjects, s.collectExpiredTombstones, + s.collectExpiredMetrics, }, }, }, diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index d82db82b9..3017dce1f 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -10,6 +10,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" @@ -726,3 +727,27 @@ func (s *Shard) HandleDeletedLocks(lockers []oid.Address) { func (s *Shard) NotificationChannel() chan<- Event { return s.gc.eventChan } + +func (s *Shard) collectExpiredMetrics(ctx context.Context, e Event) { + ctx, span := tracing.StartSpanFromContext(ctx, "shard.collectExpiredMetrics") + defer span.End() + + epoch := e.(newEpoch).epoch + + s.log.Debug(logs.ShardGCCollectingExpiredMetricsStarted, zap.Uint64("epoch", epoch)) + defer s.log.Debug(logs.ShardGCCollectingExpiredMetricsCompleted, zap.Uint64("epoch", epoch)) + + s.collectExpiredContainerSizeMetrics(ctx, epoch) +} + +func (s *Shard) collectExpiredContainerSizeMetrics(ctx context.Context, epoch uint64) { + ids, err := s.metaBase.ZeroSizeContainers(ctx) + if err != nil { + s.log.Warn(logs.ShardGCFailedToCollectZeroSizeContainers, zap.Uint64("epoch", epoch), zap.Error(err)) + return + } + if len(ids) == 0 { + return + } + s.zeroSizeContainersCallback(ctx, ids) +} diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 641b487ed..a83445cfe 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -53,6 +53,9 @@ type ExpiredObjectsCallback func(context.Context, uint64, []oid.Address) // DeletedLockCallback is a callback handling list of deleted LOCK objects. type DeletedLockCallback func(context.Context, []oid.Address) +// ZeroSizeContainersCallback is a callback hanfling list of zero-size containers. +type ZeroSizeContainersCallback func(context.Context, []cid.ID) + // MetricsWriter is an interface that must store shard's metrics. type MetricsWriter interface { // SetObjectCounter must set object counter taking into account object type. @@ -118,6 +121,8 @@ type cfg struct { deletedLockCallBack DeletedLockCallback + zeroSizeContainersCallback ZeroSizeContainersCallback + tsSource TombstoneSource metricsWriter MetricsWriter @@ -129,11 +134,12 @@ type cfg struct { func defaultCfg() *cfg { return &cfg{ - rmBatchSize: 100, - log: &logger.Logger{Logger: zap.L()}, - gcCfg: defaultGCCfg(), - reportErrorFunc: func(string, string, error) {}, - rebuildLimiter: &noopRebuildLimiter{}, + rmBatchSize: 100, + log: &logger.Logger{Logger: zap.L()}, + gcCfg: defaultGCCfg(), + reportErrorFunc: func(string, string, error) {}, + rebuildLimiter: &noopRebuildLimiter{}, + zeroSizeContainersCallback: func(context.Context, []cid.ID) {}, } } @@ -363,6 +369,13 @@ func WithRebuildWorkerLimiter(l RebuildWorkerLimiter) Option { } } +// WithZeroSizeCallback returns option to set zero-size containers callback. +func WithZeroSizeCallback(cb ZeroSizeContainersCallback) Option { + return func(c *cfg) { + c.zeroSizeContainersCallback = cb + } +} + func (s *Shard) fillInfo() { s.cfg.info.MetaBaseInfo = s.metaBase.DumpInfo() s.cfg.info.BlobStorInfo = s.blobStor.DumpInfo() diff --git a/pkg/metrics/engine.go b/pkg/metrics/engine.go index 609c30f97..80e912048 100644 --- a/pkg/metrics/engine.go +++ b/pkg/metrics/engine.go @@ -11,6 +11,7 @@ import ( type EngineMetrics interface { AddMethodDuration(method string, d time.Duration) AddToContainerSize(cnrID string, size int64) + DeleteContainerSize(cnrID string) IncErrorCounter(shardID string) ClearErrorCounter(shardID string) DeleteShardMetrics(shardID string) @@ -79,6 +80,10 @@ func (m *engineMetrics) AddToContainerSize(cnrID string, size int64) { m.containerSize.With(prometheus.Labels{containerIDLabelKey: cnrID}).Add(float64(size)) } +func (m *engineMetrics) DeleteContainerSize(cnrID string) { + m.containerSize.DeletePartialMatch(prometheus.Labels{containerIDLabelKey: cnrID}) +} + func (m *engineMetrics) AddToPayloadCounter(shardID string, size int64) { m.payloadSize.With(prometheus.Labels{shardIDLabel: shardID}).Add(float64(size)) }