diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 6a72644e5..bc1c65196 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -253,6 +253,7 @@ const ( ShardFailureToMarkLockersAsGarbage = "failure to mark lockers as garbage" ShardFailureToGetExpiredUnlockedObjects = "failure to get expired unlocked objects" ShardCouldNotMarkObjectToDeleteInMetabase = "could not mark object to delete in metabase" + ShardUnknownObjectTypeWhileIteratingExpiredObjects = "encountered unknown object type while iterating expired objects" WritecacheWaitingForChannelsToFlush = "waiting for channels to flush" WritecacheCantRemoveObjectFromWritecache = "can't remove object from write-cache" BlobovniczatreeCouldNotGetObjectFromLevel = "could not get object from level" diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index 0cca7277e..8ad35d794 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -116,7 +116,6 @@ func (s *Shard) Init(ctx context.Context) error { eventNewEpoch: { cancelFunc: func() {}, handlers: []eventHandler{ - s.collectExpiredLockObjects, s.collectExpiredObjects, s.collectExpiredGraves, s.collectExpiredMetrics, diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index 6f53c1ab3..d0b39c5dc 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -358,40 +358,60 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) { s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil) }() - s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsStarted, zap.Uint64("epoch", e.(newEpoch).epoch)) - defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsCompleted, zap.Uint64("epoch", e.(newEpoch).epoch)) + epoch := e.(newEpoch).epoch + + s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsStarted, zap.Uint64("epoch", epoch)) + defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredObjectsCompleted, zap.Uint64("epoch", epoch)) workersCount, batchSize := s.getExpiredObjectsParameters() errGroup, egCtx := errgroup.WithContext(ctx) errGroup.SetLimit(workersCount) - errGroup.Go(func() error { - batch := make([]oid.Address, 0, batchSize) - expErr := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) { - if o.Type() != objectSDK.TypeTombstone && o.Type() != objectSDK.TypeLock { - batch = append(batch, o.Address()) + handlers := map[objectSDK.Type]func(context.Context, []oid.Address){ + objectSDK.TypeRegular: s.handleExpiredObjects, + objectSDK.TypeTombstone: s.handleExpiredTombstones, + objectSDK.TypeLock: func(ctx context.Context, batch []oid.Address) { + s.expiredLockObjectsCallback(ctx, epoch, batch) + }, + } + batches := make(map[objectSDK.Type][]oid.Address) + for typ := range handlers { + batches[typ] = make([]oid.Address, 0, batchSize) + } - if len(batch) == batchSize { - expired := batch - errGroup.Go(func() error { - s.handleExpiredObjects(egCtx, expired) - return egCtx.Err() - }) - batch = make([]oid.Address, 0, batchSize) - } + errGroup.Go(func() error { + expErr := s.getExpiredObjects(egCtx, epoch, func(o *meta.ExpiredObject) { + typ := o.Type() + + if _, ok := handlers[typ]; !ok { + s.log.Warn(ctx, logs.ShardUnknownObjectTypeWhileIteratingExpiredObjects, zap.Stringer("type", typ)) + return + } + + batches[typ] = append(batches[typ], o.Address()) + + if len(batches[typ]) == batchSize { + expired := batches[typ] + errGroup.Go(func() error { + handlers[typ](egCtx, expired) + return egCtx.Err() + }) + batches[typ] = make([]oid.Address, 0, batchSize) } }) if expErr != nil { return expErr } - if len(batch) > 0 { - expired := batch - errGroup.Go(func() error { - s.handleExpiredObjects(egCtx, expired) - return egCtx.Err() - }) + for typ, batch := range batches { + if len(batch) > 0 { + expired := batch + errGroup.Go(func() error { + handlers[typ](egCtx, expired) + return egCtx.Err() + }) + } } return nil @@ -402,6 +422,41 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) { } } +func (s *Shard) handleExpiredTombstones(ctx context.Context, expired []oid.Address) { + select { + case <-ctx.Done(): + return + default: + } + + s.m.RLock() + defer s.m.RUnlock() + + if s.info.Mode.NoMetabase() { + return + } + + var inhumePrm meta.InhumePrm + inhumePrm.SetAddresses(expired...) + inhumePrm.SetGCMark() + + res, err := s.metaBase.Inhume(ctx, inhumePrm) + if err != nil { + s.log.Warn(ctx, logs.ShardCouldNotInhumeTheObjects, zap.Error(err)) + return + } + + s.gc.metrics.AddInhumedObjectCount(res.LogicInhumed(), objectTypeTombstone) + s.decObjectCounterBy(logical, res.LogicInhumed()) + s.decObjectCounterBy(user, res.UserInhumed()) + s.decContainerObjectCounter(res.InhumedByCnrID()) + + for i := range res.GetDeletionInfoLength() { + delInfo := res.GetDeletionInfoByIndex(i) + s.addToContainerSize(delInfo.CID.EncodeToString(), -int64(delInfo.Size)) + } +} + func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address) { select { case <-ctx.Done(): @@ -537,59 +592,6 @@ func (s *Shard) collectExpiredGraves(ctx context.Context, e Event) { } } -func (s *Shard) collectExpiredLockObjects(ctx context.Context, e Event) { - var err error - startedAt := time.Now() - - defer func() { - s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil) - }() - - s.log.Debug(ctx, logs.ShardGCCollectingExpiredLocksStarted, zap.Uint64("epoch", e.(newEpoch).epoch)) - defer s.log.Debug(ctx, logs.ShardGCCollectingExpiredLocksCompleted, zap.Uint64("epoch", e.(newEpoch).epoch)) - - workersCount, batchSize := s.getExpiredObjectsParameters() - - errGroup, egCtx := errgroup.WithContext(ctx) - errGroup.SetLimit(workersCount) - - errGroup.Go(func() error { - batch := make([]oid.Address, 0, batchSize) - - expErr := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) { - if o.Type() == objectSDK.TypeLock { - batch = append(batch, o.Address()) - - if len(batch) == batchSize { - expired := batch - errGroup.Go(func() error { - s.expiredLockObjectsCallback(egCtx, e.(newEpoch).epoch, expired) - return egCtx.Err() - }) - batch = make([]oid.Address, 0, batchSize) - } - } - }) - if expErr != nil { - return expErr - } - - if len(batch) > 0 { - expired := batch - errGroup.Go(func() error { - s.expiredLockObjectsCallback(egCtx, e.(newEpoch).epoch, expired) - return egCtx.Err() - }) - } - - return nil - }) - - if err = errGroup.Wait(); err != nil { - s.log.Warn(ctx, logs.ShardIteratorOverExpiredLocksFailed, zap.Error(err)) - } -} - func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, onExpiredFound func(*meta.ExpiredObject)) error { s.m.RLock() defer s.m.RUnlock()