From 5296f64df04b883e7f78718616252c3e136b6776 Mon Sep 17 00:00:00 2001 From: Aleksey Savchuk Date: Tue, 17 Dec 2024 16:06:27 +0300 Subject: [PATCH] [#1445] shard/gc: Collect objects of all types Signed-off-by: Aleksey Savchuk --- internal/logs/logs.go | 1 + pkg/local_object_storage/shard/control.go | 1 - pkg/local_object_storage/shard/gc.go | 153 +++++++++++----------- 3 files changed, 80 insertions(+), 75 deletions(-) diff --git a/internal/logs/logs.go b/internal/logs/logs.go index b24f3593d..cd186f15b 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -254,6 +254,7 @@ const ( ShardFailureToMarkLockersAsGarbage = "failure to mark lockers as garbage" ShardFailureToGetExpiredUnlockedObjects = "failure to get expired unlocked objects" ShardCouldNotMarkObjectToDeleteInMetabase = "could not mark object to delete in metabase" + ShardUnknownObjectTypeWhileIteratingExpiredObjects = "encountered unknown object type while iterating expired objects" WritecacheWaitingForChannelsToFlush = "waiting for channels to flush" WritecacheCantRemoveObjectFromWritecache = "can't remove object from write-cache" BlobovniczatreeCouldNotGetObjectFromLevel = "could not get object from level" diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index 61bd1253d..a9f5f0197 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -116,7 +116,6 @@ func (s *Shard) Init(ctx context.Context) error { eventNewEpoch: { cancelFunc: func() {}, handlers: []eventHandler{ - s.collectExpiredLockObjects, s.collectExpiredObjects, s.collectExpiredGraves, s.collectExpiredMetrics, diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index 3087ca1b1..622a0a28f 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -2,6 +2,7 @@ package shard import ( "context" + "slices" "sync" "time" @@ -14,6 +15,7 @@ import ( objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" + "golang.org/x/exp/maps" "golang.org/x/sync/errgroup" ) @@ -356,40 +358,61 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) { s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil) }() - s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsStarted, zap.Uint64("epoch", e.(newEpoch).epoch)) - defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsCompleted, zap.Uint64("epoch", e.(newEpoch).epoch)) + epoch := e.(newEpoch).epoch + + s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsStarted, zap.Uint64("epoch", epoch)) + defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsCompleted, zap.Uint64("epoch", epoch)) workersCount, batchSize := s.getExpiredObjectsParameters() errGroup, egCtx := errgroup.WithContext(ctx) errGroup.SetLimit(workersCount) - errGroup.Go(func() error { - batch := make([]oid.Address, 0, batchSize) - expErr := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) { - if o.Type() != objectSDK.TypeTombstone && o.Type() != objectSDK.TypeLock { - batch = append(batch, o.Address()) + handlers := map[objectSDK.Type]func(context.Context, []oid.Address){ + objectSDK.TypeRegular: s.handleExpiredObjects, + objectSDK.TypeTombstone: s.handleExpiredTombstones, + objectSDK.TypeLock: func(ctx context.Context, batch []oid.Address) { + s.expiredLockObjectsCallback(ctx, epoch, batch) + }, + } + knownTypes := maps.Keys(handlers) - if len(batch) == batchSize { - expired := batch - errGroup.Go(func() error { - s.handleExpiredObjects(egCtx, expired) - return egCtx.Err() - }) - batch = make([]oid.Address, 0, batchSize) - } + batches := make(map[objectSDK.Type][]oid.Address) + for _, typ := range knownTypes { + batches[typ] = make([]oid.Address, 0, batchSize) + } + + errGroup.Go(func() error { + expErr := s.getExpiredObjects(egCtx, epoch, func(o *meta.ExpiredObject) { + typ := o.Type() + + if !slices.Contains(knownTypes, typ) { + s.log.Warn(ctx, logs.ShardUnknownObjectTypeWhileIteratingExpiredObjects, zap.Stringer("type", typ)) + } + + batches[typ] = append(batches[typ], o.Address()) + + if len(batches[typ]) == batchSize { + expired := batches[typ] + errGroup.Go(func() error { + handlers[typ](egCtx, expired) + return egCtx.Err() + }) + batches[typ] = make([]oid.Address, 0, batchSize) } }) if expErr != nil { return expErr } - if len(batch) > 0 { - expired := batch - errGroup.Go(func() error { - s.handleExpiredObjects(egCtx, expired) - return egCtx.Err() - }) + for typ, batch := range batches { + if len(batch) > 0 { + expired := batch + errGroup.Go(func() error { + handlers[typ](egCtx, expired) + return egCtx.Err() + }) + } } return nil @@ -400,6 +423,41 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) { } } +func (s *Shard) handleExpiredTombstones(ctx context.Context, expired []oid.Address) { + select { + case <-ctx.Done(): + return + default: + } + + s.m.RLock() + defer s.m.RUnlock() + + if s.info.Mode.NoMetabase() { + return + } + + var inhumePrm meta.InhumePrm + inhumePrm.SetAddresses(expired...) + inhumePrm.SetGCMark() + + res, err := s.metaBase.Inhume(ctx, inhumePrm) + if err != nil { + s.log.Warn(ctx, logs.ShardCouldNotInhumeTheObjects, zap.Error(err)) + return + } + + s.gc.metrics.AddInhumedObjectCount(res.LogicInhumed(), objectTypeTombstone) + s.decObjectCounterBy(logical, res.LogicInhumed()) + s.decObjectCounterBy(user, res.UserInhumed()) + s.decContainerObjectCounter(res.InhumedByCnrID()) + + for i := range res.GetDeletionInfoLength() { + delInfo := res.GetDeletionInfoByIndex(i) + s.addToContainerSize(delInfo.CID.EncodeToString(), -int64(delInfo.Size)) + } +} + func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address) { select { case <-ctx.Done(): @@ -535,59 +593,6 @@ func (s *Shard) collectExpiredGraves(ctx context.Context, e Event) { } } -func (s *Shard) collectExpiredLockObjects(ctx context.Context, e Event) { - var err error - startedAt := time.Now() - - defer func() { - s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil) - }() - - s.log.Debug(ctx, logs.ShardGCCollectingExpiredLocksStarted, zap.Uint64("epoch", e.(newEpoch).epoch)) - defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredLocksCompleted, zap.Uint64("epoch", e.(newEpoch).epoch)) - - workersCount, batchSize := s.getExpiredObjectsParameters() - - errGroup, egCtx := errgroup.WithContext(ctx) - errGroup.SetLimit(workersCount) - - errGroup.Go(func() error { - batch := make([]oid.Address, 0, batchSize) - - expErr := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) { - if o.Type() == objectSDK.TypeLock { - batch = append(batch, o.Address()) - - if len(batch) == batchSize { - expired := batch - errGroup.Go(func() error { - s.expiredLockObjectsCallback(egCtx, e.(newEpoch).epoch, expired) - return egCtx.Err() - }) - batch = make([]oid.Address, 0, batchSize) - } - } - }) - if expErr != nil { - return expErr - } - - if len(batch) > 0 { - expired := batch - errGroup.Go(func() error { - s.expiredLockObjectsCallback(egCtx, e.(newEpoch).epoch, expired) - return egCtx.Err() - }) - } - - return nil - }) - - if err = errGroup.Wait(); err != nil { - s.log.Warn(ctx, logs.ShardIteratorOverExpiredLocksFailed, zap.Error(err)) - } -} - func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, onExpiredFound func(*meta.ExpiredObject)) error { s.m.RLock() defer s.m.RUnlock()