diff --git a/cmd/frostfs-node/config.go b/cmd/frostfs-node/config.go index 1b3e094d9..e1e228325 100644 --- a/cmd/frostfs-node/config.go +++ b/cmd/frostfs-node/config.go @@ -1009,7 +1009,7 @@ func initLocalStorage(ctx context.Context, c *cfg) { ls := engine.New(c.engineOpts()...) addNewEpochAsyncNotificationHandler(c, func(ev event.Event) { - ls.HandleNewEpoch(ev.(netmap2.NewEpoch).EpochNumber()) + ls.HandleNewEpoch(ctx, ev.(netmap2.NewEpoch).EpochNumber()) }) // allocate memory for the service; diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 34ca0ae93..e81976a32 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -270,7 +270,9 @@ const ( ShardCantOpenMetabaseMoveToADegradedMode = "can't open metabase, move to a degraded mode" ShardCantInitializeMetabaseMoveToADegradedreadonlyMode = "can't initialize metabase, move to a degraded-read-only mode" ShardTryingToRestoreReadwriteMode = "trying to restore read-write mode" - ShardStopEventListenerByClosedChannel = "stop event listener by closed channel" + ShardStopEventListenerByClosedEventChannel = "stop event listener by closed `event` channel" + ShardStopEventListenerByClosedStopChannel = "stop event listener by closed `stop` channel" + ShardStopEventListenerByContext = "stop event listener by context" ShardCouldNotSubmitGCJobToWorkerPool = "could not submit GC job to worker pool" ShardGCIsStopped = "GC is stopped" ShardWaitingForGCWorkersToStop = "waiting for GC workers to stop..." diff --git a/pkg/local_object_storage/engine/lock_test.go b/pkg/local_object_storage/engine/lock_test.go index 69b74f4a4..7fa7c27ef 100644 --- a/pkg/local_object_storage/engine/lock_test.go +++ b/pkg/local_object_storage/engine/lock_test.go @@ -133,7 +133,7 @@ func TestLockUserScenario(t *testing.T) { require.ErrorIs(t, err, meta.ErrLockObjectRemoval) // 5. - e.HandleNewEpoch(lockerExpiresAfter + 1) + e.HandleNewEpoch(context.Background(), lockerExpiresAfter+1) inhumePrm.WithTarget(tombAddr, objAddr) @@ -206,7 +206,7 @@ func TestLockExpiration(t *testing.T) { require.ErrorAs(t, err, &objLockedErr) // 3. - e.HandleNewEpoch(lockerExpiresAfter + 1) + e.HandleNewEpoch(context.Background(), lockerExpiresAfter+1) // 4. inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj)) diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 37a857c9d..bd25dde59 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -329,14 +329,18 @@ func (e *StorageEngine) SetShardMode(id *shard.ID, m mode.Mode, resetErrorCounte } // HandleNewEpoch notifies every shard about NewEpoch event. -func (e *StorageEngine) HandleNewEpoch(epoch uint64) { +func (e *StorageEngine) HandleNewEpoch(ctx context.Context, epoch uint64) { ev := shard.EventNewEpoch(epoch) e.mtx.RLock() defer e.mtx.RUnlock() for _, sh := range e.shards { - sh.NotificationChannel() <- ev + select { + case <-ctx.Done(): + return + case sh.NotificationChannel() <- ev: + } } } diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index bfd1f1b11..24c3a337a 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -105,6 +105,8 @@ type gc struct { remover func(context.Context) gcRunResult + // eventChan is used only for listening for the new epoch event. + // It is ok to keep opened, we are listening for context done when writing in it. eventChan chan Event mEventHandler map[eventType]*eventHandlers } @@ -155,13 +157,21 @@ func (gc *gc) listenEvents(ctx context.Context) { defer gc.wg.Done() for { - event, ok := <-gc.eventChan - if !ok { - gc.log.Warn(logs.ShardStopEventListenerByClosedChannel) + select { + case <-gc.stopChannel: + gc.log.Warn(logs.ShardStopEventListenerByClosedStopChannel) return - } + case <-ctx.Done(): + gc.log.Warn(logs.ShardStopEventListenerByContext) + return + case event, ok := <-gc.eventChan: + if !ok { + gc.log.Warn(logs.ShardStopEventListenerByClosedEventChannel) + return + } - gc.handleEvent(ctx, event) + gc.handleEvent(ctx, event) + } } } @@ -180,6 +190,11 @@ func (gc *gc) handleEvent(ctx context.Context, event Event) { v.prevGroup.Add(len(v.handlers)) for i := range v.handlers { + select { + case <-ctx.Done(): + return + default: + } h := v.handlers[i] err := gc.workerPool.Submit(func() { @@ -196,6 +211,18 @@ func (gc *gc) handleEvent(ctx context.Context, event Event) { } } +func (gc *gc) releaseResources() { + if gc.workerPool != nil { + gc.workerPool.Release() + } + + // Avoid to close gc.eventChan here, + // because it is possible that we are close it earlier than stop writing. + // It is ok to keep it opened. + + gc.log.Debug(logs.ShardGCIsStopped) +} + func (gc *gc) tickRemover(ctx context.Context) { defer gc.wg.Done() @@ -204,14 +231,13 @@ func (gc *gc) tickRemover(ctx context.Context) { for { select { + case <-ctx.Done(): + // Context canceled earlier than we start to close shards. + // It make sense to stop collecting garbage by context too. + gc.releaseResources() + return case <-gc.stopChannel: - if gc.workerPool != nil { - gc.workerPool.Release() - } - - close(gc.eventChan) - - gc.log.Debug(logs.ShardGCIsStopped) + gc.releaseResources() return case <-timer.C: startedAt := time.Now()