package shard import ( "context" "sync" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) const ( minExpiredWorkers = 2 minExpiredBatchSize = 1 ) // TombstoneSource is an interface that checks // tombstone status in the FrostFS network. type TombstoneSource interface { // IsTombstoneAvailable must return boolean value that means // provided tombstone's presence in the FrostFS network at the // time of the passed epoch. IsTombstoneAvailable(ctx context.Context, addr oid.Address, epoch uint64) bool } // Event represents class of external events. type Event interface { typ() eventType } type eventType int const ( _ eventType = iota eventNewEpoch ) type newEpoch struct { epoch uint64 } func (e newEpoch) typ() eventType { return eventNewEpoch } // EventNewEpoch returns new epoch event. func EventNewEpoch(e uint64) Event { return newEpoch{ epoch: e, } } type eventHandler func(context.Context, Event) type eventHandlers struct { prevGroup sync.WaitGroup cancelFunc context.CancelFunc handlers []eventHandler } type gcRunResult struct { success bool deleted uint64 failedToDelete uint64 } const ( objectTypeLock = "lock" objectTypeTombstone = "tombstone" objectTypeRegular = "regular" ) type GCMectrics interface { AddRunDuration(d time.Duration, success bool) AddDeletedCount(deleted, failed uint64) AddExpiredObjectCollectionDuration(d time.Duration, success bool, objectType string) AddInhumedObjectCount(count uint64, objectType string) } type noopGCMetrics struct{} func (m *noopGCMetrics) AddRunDuration(time.Duration, bool) {} func (m *noopGCMetrics) AddDeletedCount(uint64, uint64) {} func (m *noopGCMetrics) AddExpiredObjectCollectionDuration(time.Duration, bool, string) {} func (m *noopGCMetrics) AddInhumedObjectCount(uint64, string) {} type gc struct { *gcCfg onceStop sync.Once stopChannel chan struct{} wg sync.WaitGroup workerPool util.WorkerPool remover func(context.Context) gcRunResult eventChan chan Event mEventHandler map[eventType]*eventHandlers } type gcCfg struct { removerInterval time.Duration log *logger.Logger workerPoolInit func(int) util.WorkerPool expiredCollectorWorkerCount int expiredCollectorBatchSize int metrics GCMectrics testHookRemover func(ctx context.Context) gcRunResult } func defaultGCCfg() gcCfg { return gcCfg{ removerInterval: 10 * time.Second, log: &logger.Logger{Logger: zap.L()}, workerPoolInit: func(int) util.WorkerPool { return nil }, metrics: &noopGCMetrics{}, } } func (gc *gc) init(ctx context.Context) { sz := 0 for _, v := range gc.mEventHandler { sz += len(v.handlers) } if sz > 0 { gc.workerPool = gc.workerPoolInit(sz) } gc.wg.Add(2) go gc.tickRemover(ctx) go gc.listenEvents(ctx) } func (gc *gc) listenEvents(ctx context.Context) { defer gc.wg.Done() for { event, ok := <-gc.eventChan if !ok { gc.log.Warn(logs.ShardStopEventListenerByClosedChannel) return } gc.handleEvent(ctx, event) } } func (gc *gc) handleEvent(ctx context.Context, event Event) { v, ok := gc.mEventHandler[event.typ()] if !ok { return } v.cancelFunc() v.prevGroup.Wait() var runCtx context.Context runCtx, v.cancelFunc = context.WithCancel(ctx) v.prevGroup.Add(len(v.handlers)) for i := range v.handlers { h := v.handlers[i] err := gc.workerPool.Submit(func() { defer v.prevGroup.Done() h(runCtx, event) }) if err != nil { gc.log.Warn(logs.ShardCouldNotSubmitGCJobToWorkerPool, zap.String("error", err.Error()), ) v.prevGroup.Done() } } } func (gc *gc) tickRemover(ctx context.Context) { defer gc.wg.Done() timer := time.NewTimer(gc.removerInterval) defer timer.Stop() for { select { case <-gc.stopChannel: if gc.workerPool != nil { gc.workerPool.Release() } close(gc.eventChan) gc.log.Debug(logs.ShardGCIsStopped) return case <-timer.C: startedAt := time.Now() var result gcRunResult if gc.testHookRemover != nil { result = gc.testHookRemover(ctx) } else { result = gc.remover(ctx) } timer.Reset(gc.removerInterval) gc.metrics.AddRunDuration(time.Since(startedAt), result.success) gc.metrics.AddDeletedCount(result.deleted, result.failedToDelete) } } } func (gc *gc) stop() { gc.onceStop.Do(func() { close(gc.stopChannel) }) gc.log.Info(logs.ShardWaitingForGCWorkersToStop) gc.wg.Wait() } // iterates over metabase and deletes objects // with GC-marked graves. // Does nothing if shard is in "read-only" mode. func (s *Shard) removeGarbage(pctx context.Context) (result gcRunResult) { ctx, cancel := context.WithCancel(pctx) defer cancel() s.gcCancel.Store(cancel) if s.setModeRequested.Load() { return } s.m.RLock() defer s.m.RUnlock() if s.info.Mode != mode.ReadWrite { return } s.log.Debug(logs.ShardGCRemoveGarbageStarted) defer s.log.Debug(logs.ShardGCRemoveGarbageCompleted) buf := make([]oid.Address, 0, s.rmBatchSize) var iterPrm meta.GarbageIterationPrm iterPrm.SetHandler(func(g meta.GarbageObject) error { select { case <-ctx.Done(): return ctx.Err() default: } buf = append(buf, g.Address()) if len(buf) == s.rmBatchSize { return meta.ErrInterruptIterator } return nil }) // iterate over metabase's objects with GC mark // (no more than s.rmBatchSize objects) err := s.metaBase.IterateOverGarbage(ctx, iterPrm) if err != nil { s.log.Warn(logs.ShardIteratorOverMetabaseGraveyardFailed, zap.String("error", err.Error()), ) return } else if len(buf) == 0 { result.success = true return } var deletePrm DeletePrm deletePrm.SetAddresses(buf...) // delete accumulated objects res, err := s.delete(ctx, deletePrm, true) result.deleted = res.deleted result.failedToDelete = uint64(len(buf)) - res.deleted result.success = true if err != nil { s.log.Warn(logs.ShardCouldNotDeleteTheObjects, zap.String("error", err.Error()), ) result.success = false } return } func (s *Shard) getExpiredObjectsParameters() (workerCount, batchSize int) { workerCount = minExpiredWorkers batchSize = minExpiredBatchSize if s.gc.gcCfg.expiredCollectorBatchSize > batchSize { batchSize = s.gc.gcCfg.expiredCollectorBatchSize } if s.gc.gcCfg.expiredCollectorWorkerCount > workerCount { workerCount = s.gc.gcCfg.expiredCollectorWorkerCount } return } func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) { var err error startedAt := time.Now() defer func() { s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeRegular) }() s.log.Debug(logs.ShardGCCollectingExpiredObjectsStarted, zap.Uint64("epoch", e.(newEpoch).epoch)) defer s.log.Debug(logs.ShardGCCollectingExpiredObjectsCompleted, zap.Uint64("epoch", e.(newEpoch).epoch)) workersCount, batchSize := s.getExpiredObjectsParameters() errGroup, egCtx := errgroup.WithContext(ctx) errGroup.SetLimit(workersCount) errGroup.Go(func() error { batch := make([]oid.Address, 0, batchSize) expErr := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) { if o.Type() != objectSDK.TypeTombstone && o.Type() != objectSDK.TypeLock { batch = append(batch, o.Address()) if len(batch) == batchSize { expired := batch errGroup.Go(func() error { s.handleExpiredObjects(egCtx, expired) return egCtx.Err() }) batch = make([]oid.Address, 0, batchSize) } } }) if expErr != nil { return expErr } if len(batch) > 0 { expired := batch errGroup.Go(func() error { s.handleExpiredObjects(egCtx, expired) return egCtx.Err() }) } return nil }) if err = errGroup.Wait(); err != nil { s.log.Warn(logs.ShardIteratorOverExpiredObjectsFailed, zap.String("error", err.Error())) } } func (s *Shard) handleExpiredObjects(ctx context.Context, expired []oid.Address) { select { case <-ctx.Done(): return default: } s.m.RLock() defer s.m.RUnlock() if s.info.Mode.NoMetabase() { return } expired, err := s.getExpiredWithLinked(ctx, expired) if err != nil { s.log.Warn(logs.ShardGCFailedToGetExpiredWithLinked, zap.Error(err)) return } var inhumePrm meta.InhumePrm inhumePrm.SetAddresses(expired...) inhumePrm.SetGCMark() // inhume the collected objects res, err := s.metaBase.Inhume(ctx, inhumePrm) if err != nil { s.log.Warn(logs.ShardCouldNotInhumeTheObjects, zap.String("error", err.Error()), ) return } s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeRegular) s.decObjectCounterBy(logical, res.AvailableInhumed()) s.decContainerObjectCounter(res.InhumedByCnrID()) i := 0 for i < res.GetDeletionInfoLength() { delInfo := res.GetDeletionInfoByIndex(i) s.addToContainerSize(delInfo.CID.EncodeToString(), -int64(delInfo.Size)) i++ } } func (s *Shard) getExpiredWithLinked(ctx context.Context, source []oid.Address) ([]oid.Address, error) { result := make([]oid.Address, 0, len(source)) parentToChildren, err := s.metaBase.GetChildren(ctx, source) if err != nil { return nil, err } for parent, children := range parentToChildren { result = append(result, parent) result = append(result, children...) } return result, nil } func (s *Shard) collectExpiredTombstones(ctx context.Context, e Event) { var err error startedAt := time.Now() defer func() { s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeTombstone) }() epoch := e.(newEpoch).epoch log := s.log.With(zap.Uint64("epoch", epoch)) log.Debug(logs.ShardStartedExpiredTombstonesHandling) defer log.Debug(logs.ShardFinishedExpiredTombstonesHandling) const tssDeleteBatch = 50 tss := make([]meta.TombstonedObject, 0, tssDeleteBatch) tssExp := make([]meta.TombstonedObject, 0, tssDeleteBatch) var iterPrm meta.GraveyardIterationPrm iterPrm.SetHandler(func(deletedObject meta.TombstonedObject) error { tss = append(tss, deletedObject) if len(tss) == tssDeleteBatch { return meta.ErrInterruptIterator } return nil }) for { log.Debug(logs.ShardIteratingTombstones) s.m.RLock() if s.info.Mode.NoMetabase() { s.log.Debug(logs.ShardShardIsInADegradedModeSkipCollectingExpiredTombstones) s.m.RUnlock() return } err = s.metaBase.IterateOverGraveyard(ctx, iterPrm) if err != nil { log.Error(logs.ShardIteratorOverGraveyardFailed, zap.Error(err)) s.m.RUnlock() return } s.m.RUnlock() tssLen := len(tss) if tssLen == 0 { break } for _, ts := range tss { if !s.tsSource.IsTombstoneAvailable(ctx, ts.Tombstone(), epoch) { tssExp = append(tssExp, ts) } } log.Debug(logs.ShardHandlingExpiredTombstonesBatch, zap.Int("number", len(tssExp))) s.expiredTombstonesCallback(ctx, tssExp) iterPrm.SetOffset(tss[tssLen-1].Address()) tss = tss[:0] tssExp = tssExp[:0] } } func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) { var err error startedAt := time.Now() defer func() { s.gc.metrics.AddExpiredObjectCollectionDuration(time.Since(startedAt), err == nil, objectTypeLock) }() s.log.Debug(logs.ShardGCCollectingExpiredLocksStarted, zap.Uint64("epoch", e.(newEpoch).epoch)) defer s.log.Debug(logs.ShardGCCollectingExpiredLocksCompleted, zap.Uint64("epoch", e.(newEpoch).epoch)) workersCount, batchSize := s.getExpiredObjectsParameters() errGroup, egCtx := errgroup.WithContext(ctx) errGroup.SetLimit(workersCount) errGroup.Go(func() error { batch := make([]oid.Address, 0, batchSize) expErr := s.getExpiredObjects(egCtx, e.(newEpoch).epoch, func(o *meta.ExpiredObject) { if o.Type() == objectSDK.TypeLock { batch = append(batch, o.Address()) if len(batch) == batchSize { expired := batch errGroup.Go(func() error { s.expiredLocksCallback(egCtx, e.(newEpoch).epoch, expired) return egCtx.Err() }) batch = make([]oid.Address, 0, batchSize) } } }) if expErr != nil { return expErr } if len(batch) > 0 { expired := batch errGroup.Go(func() error { s.expiredLocksCallback(egCtx, e.(newEpoch).epoch, expired) return egCtx.Err() }) } return nil }) if err = errGroup.Wait(); err != nil { s.log.Warn(logs.ShardIteratorOverExpiredLocksFailed, zap.String("error", err.Error())) } } func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, onExpiredFound func(*meta.ExpiredObject)) error { s.m.RLock() defer s.m.RUnlock() if s.info.Mode.NoMetabase() { return ErrDegradedMode } err := s.metaBase.IterateExpired(ctx, epoch, func(expiredObject *meta.ExpiredObject) error { select { case <-ctx.Done(): return meta.ErrInterruptIterator default: onExpiredFound(expiredObject) return nil } }) if err != nil { return err } return ctx.Err() } func (s *Shard) selectExpired(ctx context.Context, epoch uint64, addresses []oid.Address) ([]oid.Address, error) { s.m.RLock() defer s.m.RUnlock() if s.info.Mode.NoMetabase() { return nil, ErrDegradedMode } return s.metaBase.FilterExpired(ctx, epoch, addresses) } // HandleExpiredTombstones marks tombstones themselves as garbage // and clears up corresponding graveyard records. // // Does not modify tss. func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.TombstonedObject) { if s.GetMode().NoMetabase() { return } // Mark tombstones as garbage. var pInhume meta.InhumePrm tsAddrs := make([]oid.Address, 0, len(tss)) for _, ts := range tss { tsAddrs = append(tsAddrs, ts.Tombstone()) } pInhume.SetGCMark() pInhume.SetAddresses(tsAddrs...) // inhume tombstones res, err := s.metaBase.Inhume(ctx, pInhume) if err != nil { s.log.Warn(logs.ShardCouldNotMarkTombstonesAsGarbage, zap.String("error", err.Error()), ) return } s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeTombstone) s.decObjectCounterBy(logical, res.AvailableInhumed()) s.decContainerObjectCounter(res.InhumedByCnrID()) i := 0 for i < res.GetDeletionInfoLength() { delInfo := res.GetDeletionInfoByIndex(i) s.addToContainerSize(delInfo.CID.EncodeToString(), -int64(delInfo.Size)) i++ } // drop just processed expired tombstones // from graveyard err = s.metaBase.DropGraves(ctx, tss) if err != nil { s.log.Warn(logs.ShardCouldNotDropExpiredGraveRecords, zap.Error(err)) } } // HandleExpiredLocks unlocks all objects which were locked by lockers. // If successful, marks lockers themselves as garbage. func (s *Shard) HandleExpiredLocks(ctx context.Context, epoch uint64, lockers []oid.Address) { if s.GetMode().NoMetabase() { return } unlocked, err := s.metaBase.FreeLockedBy(lockers) if err != nil { s.log.Warn(logs.ShardFailureToUnlockObjects, zap.String("error", err.Error()), ) return } var pInhume meta.InhumePrm pInhume.SetAddresses(lockers...) pInhume.SetForceGCMark() res, err := s.metaBase.Inhume(ctx, pInhume) if err != nil { s.log.Warn(logs.ShardFailureToMarkLockersAsGarbage, zap.String("error", err.Error()), ) return } s.gc.metrics.AddInhumedObjectCount(res.AvailableInhumed(), objectTypeLock) s.decObjectCounterBy(logical, res.AvailableInhumed()) s.decContainerObjectCounter(res.InhumedByCnrID()) i := 0 for i < res.GetDeletionInfoLength() { delInfo := res.GetDeletionInfoByIndex(i) s.addToContainerSize(delInfo.CID.EncodeToString(), -int64(delInfo.Size)) i++ } s.inhumeUnlockedIfExpired(ctx, epoch, unlocked) } func (s *Shard) inhumeUnlockedIfExpired(ctx context.Context, epoch uint64, unlocked []oid.Address) { expiredUnlocked, err := s.selectExpired(ctx, epoch, unlocked) if err != nil { s.log.Warn(logs.ShardFailureToGetExpiredUnlockedObjects, zap.Error(err)) return } if len(expiredUnlocked) == 0 { return } s.handleExpiredObjects(ctx, expiredUnlocked) } // HandleDeletedLocks unlocks all objects which were locked by lockers. func (s *Shard) HandleDeletedLocks(lockers []oid.Address) { if s.GetMode().NoMetabase() { return } _, err := s.metaBase.FreeLockedBy(lockers) if err != nil { s.log.Warn(logs.ShardFailureToUnlockObjects, zap.String("error", err.Error()), ) return } } // NotificationChannel returns channel for shard events. func (s *Shard) NotificationChannel() chan<- Event { return s.gc.eventChan }