diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 5a195f688..4661781f1 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -251,8 +251,10 @@ const ( ShardCantOpenMetabaseMoveToADegradedMode = "can't open metabase, move to a degraded mode" ShardCantInitializeMetabaseMoveToADegradedreadonlyMode = "can't initialize metabase, move to a degraded-read-only mode" ShardStopEventListenerByClosedEventChannel = "stop event listener by closed `event` channel" - ShardStopEventListenerByClosedStopChannel = "stop event listener by closed `stop` channel" ShardEventProcessingInProgress = "event processing is in progress, skip the received" + ShardEventProcessingStart = "event processing is started" + ShardEventProcessingEnd = "event processing is completed" + ShardStopEventListenerByClosedStopChannel = "stop event listener by closed `stop` channel" ShardStopEventListenerByContext = "stop event listener by context" ShardCouldNotSubmitGCJobToWorkerPool = "could not submit GC job to worker pool" ShardGCIsStopped = "GC is stopped" diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index 90d7afdd4..b94d85f31 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -158,7 +158,6 @@ func (s *Shard) Init(ctx context.Context) error { eventChan: make(chan Event), mEventHandler: map[eventType]*eventHandlers{ eventNewEpoch: { - cancelFunc: func() {}, handlers: []eventHandler{ s.collectExpiredLocks, s.collectExpiredObjects, diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index d605746e8..cb0b1ac39 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -3,6 +3,7 @@ package shard import ( "context" "sync" + "sync/atomic" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" @@ -61,11 +62,11 @@ func EventNewEpoch(e uint64) Event { type eventHandler func(context.Context, Event) type eventHandlers struct { - prevGroup sync.WaitGroup - - cancelFunc context.CancelFunc + wg sync.WaitGroup handlers []eventHandler + + runningHandlers atomic.Int32 } type gcRunResult struct { @@ -182,33 +183,36 @@ func (gc *gc) handleEvent(ctx context.Context, event Event) { if !ok { return } - - v.cancelFunc() - v.prevGroup.Wait() - - var runCtx context.Context - runCtx, v.cancelFunc = context.WithCancel(ctx) - - v.prevGroup.Add(len(v.handlers)) - + if !v.runningHandlers.CompareAndSwap(0, int32(len(v.handlers))) { + gc.log.Warn(logs.ShardEventProcessingInProgress) + return + } + addAndLog := func(delta int32) { + if v.runningHandlers.Add(delta) == 0 { + gc.log.Debug(logs.ShardEventProcessingEnd) + } + } + gc.log.Debug(logs.ShardEventProcessingStart) for i := range v.handlers { select { case <-ctx.Done(): + addAndLog(int32(i - len(v.handlers))) return default: } h := v.handlers[i] - + v.wg.Add(1) err := gc.workerPool.Submit(func() { - defer v.prevGroup.Done() - h(runCtx, event) + h(ctx, event) + addAndLog(-1) + v.wg.Done() }) if err != nil { gc.log.Warn(logs.ShardCouldNotSubmitGCJobToWorkerPool, zap.String("error", err.Error()), ) - - v.prevGroup.Done() + addAndLog(-1) + v.wg.Done() } } } @@ -264,7 +268,12 @@ func (gc *gc) stop() { }) gc.log.Info(logs.ShardWaitingForGCWorkersToStop) + // Order is important here, at first we need to wait for listener ends, + // and then for handlers, which spawn inside the listener. gc.wg.Wait() + for _, h := range gc.mEventHandler { + h.wg.Wait() + } } // iterates over metabase and deletes objects