[#930] gc: Stop internal activity by context

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
Anton Nikiforov 2024-01-25 11:02:57 +03:00 committed by Evgenii Stratonikov
parent c441296592
commit e3573de6db
5 changed files with 50 additions and 18 deletions

View file

@ -1009,7 +1009,7 @@ func initLocalStorage(ctx context.Context, c *cfg) {
ls := engine.New(c.engineOpts()...) ls := engine.New(c.engineOpts()...)
addNewEpochAsyncNotificationHandler(c, func(ev event.Event) { addNewEpochAsyncNotificationHandler(c, func(ev event.Event) {
ls.HandleNewEpoch(ev.(netmap2.NewEpoch).EpochNumber()) ls.HandleNewEpoch(ctx, ev.(netmap2.NewEpoch).EpochNumber())
}) })
// allocate memory for the service; // allocate memory for the service;

View file

@ -270,7 +270,9 @@ const (
ShardCantOpenMetabaseMoveToADegradedMode = "can't open metabase, move to a degraded mode" ShardCantOpenMetabaseMoveToADegradedMode = "can't open metabase, move to a degraded mode"
ShardCantInitializeMetabaseMoveToADegradedreadonlyMode = "can't initialize metabase, move to a degraded-read-only mode" ShardCantInitializeMetabaseMoveToADegradedreadonlyMode = "can't initialize metabase, move to a degraded-read-only mode"
ShardTryingToRestoreReadwriteMode = "trying to restore read-write mode" ShardTryingToRestoreReadwriteMode = "trying to restore read-write mode"
ShardStopEventListenerByClosedChannel = "stop event listener by closed channel" ShardStopEventListenerByClosedEventChannel = "stop event listener by closed `event` channel"
ShardStopEventListenerByClosedStopChannel = "stop event listener by closed `stop` channel"
ShardStopEventListenerByContext = "stop event listener by context"
ShardCouldNotSubmitGCJobToWorkerPool = "could not submit GC job to worker pool" ShardCouldNotSubmitGCJobToWorkerPool = "could not submit GC job to worker pool"
ShardGCIsStopped = "GC is stopped" ShardGCIsStopped = "GC is stopped"
ShardWaitingForGCWorkersToStop = "waiting for GC workers to stop..." ShardWaitingForGCWorkersToStop = "waiting for GC workers to stop..."

View file

@ -133,7 +133,7 @@ func TestLockUserScenario(t *testing.T) {
require.ErrorIs(t, err, meta.ErrLockObjectRemoval) require.ErrorIs(t, err, meta.ErrLockObjectRemoval)
// 5. // 5.
e.HandleNewEpoch(lockerExpiresAfter + 1) e.HandleNewEpoch(context.Background(), lockerExpiresAfter+1)
inhumePrm.WithTarget(tombAddr, objAddr) inhumePrm.WithTarget(tombAddr, objAddr)
@ -206,7 +206,7 @@ func TestLockExpiration(t *testing.T) {
require.ErrorAs(t, err, &objLockedErr) require.ErrorAs(t, err, &objLockedErr)
// 3. // 3.
e.HandleNewEpoch(lockerExpiresAfter + 1) e.HandleNewEpoch(context.Background(), lockerExpiresAfter+1)
// 4. // 4.
inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj)) inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj))

View file

@ -329,14 +329,18 @@ func (e *StorageEngine) SetShardMode(id *shard.ID, m mode.Mode, resetErrorCounte
} }
// HandleNewEpoch notifies every shard about NewEpoch event. // HandleNewEpoch notifies every shard about NewEpoch event.
func (e *StorageEngine) HandleNewEpoch(epoch uint64) { func (e *StorageEngine) HandleNewEpoch(ctx context.Context, epoch uint64) {
ev := shard.EventNewEpoch(epoch) ev := shard.EventNewEpoch(epoch)
e.mtx.RLock() e.mtx.RLock()
defer e.mtx.RUnlock() defer e.mtx.RUnlock()
for _, sh := range e.shards { for _, sh := range e.shards {
sh.NotificationChannel() <- ev select {
case <-ctx.Done():
return
case sh.NotificationChannel() <- ev:
}
} }
} }

View file

@ -105,6 +105,8 @@ type gc struct {
remover func(context.Context) gcRunResult remover func(context.Context) gcRunResult
// eventChan is used only for listening for the new epoch event.
// It is ok to keep opened, we are listening for context done when writing in it.
eventChan chan Event eventChan chan Event
mEventHandler map[eventType]*eventHandlers mEventHandler map[eventType]*eventHandlers
} }
@ -155,13 +157,21 @@ func (gc *gc) listenEvents(ctx context.Context) {
defer gc.wg.Done() defer gc.wg.Done()
for { for {
event, ok := <-gc.eventChan select {
if !ok { case <-gc.stopChannel:
gc.log.Warn(logs.ShardStopEventListenerByClosedChannel) gc.log.Warn(logs.ShardStopEventListenerByClosedStopChannel)
return return
} case <-ctx.Done():
gc.log.Warn(logs.ShardStopEventListenerByContext)
return
case event, ok := <-gc.eventChan:
if !ok {
gc.log.Warn(logs.ShardStopEventListenerByClosedEventChannel)
return
}
gc.handleEvent(ctx, event) gc.handleEvent(ctx, event)
}
} }
} }
@ -180,6 +190,11 @@ func (gc *gc) handleEvent(ctx context.Context, event Event) {
v.prevGroup.Add(len(v.handlers)) v.prevGroup.Add(len(v.handlers))
for i := range v.handlers { for i := range v.handlers {
select {
case <-ctx.Done():
return
default:
}
h := v.handlers[i] h := v.handlers[i]
err := gc.workerPool.Submit(func() { err := gc.workerPool.Submit(func() {
@ -196,6 +211,18 @@ func (gc *gc) handleEvent(ctx context.Context, event Event) {
} }
} }
func (gc *gc) releaseResources() {
if gc.workerPool != nil {
gc.workerPool.Release()
}
// Avoid to close gc.eventChan here,
// because it is possible that we are close it earlier than stop writing.
// It is ok to keep it opened.
gc.log.Debug(logs.ShardGCIsStopped)
}
func (gc *gc) tickRemover(ctx context.Context) { func (gc *gc) tickRemover(ctx context.Context) {
defer gc.wg.Done() defer gc.wg.Done()
@ -204,14 +231,13 @@ func (gc *gc) tickRemover(ctx context.Context) {
for { for {
select { select {
case <-ctx.Done():
// Context canceled earlier than we start to close shards.
// It make sense to stop collecting garbage by context too.
gc.releaseResources()
return
case <-gc.stopChannel: case <-gc.stopChannel:
if gc.workerPool != nil { gc.releaseResources()
gc.workerPool.Release()
}
close(gc.eventChan)
gc.log.Debug(logs.ShardGCIsStopped)
return return
case <-timer.C: case <-timer.C:
startedAt := time.Now() startedAt := time.Now()