node: Improve shutdown #930

Merged
fyrchik merged 2 commits from acid-ant/frostfs-node:bugfix/improve-shutdown into master 2024-01-31 08:30:35 +00:00
6 changed files with 52 additions and 18 deletions

View file

@ -1009,7 +1009,7 @@ func initLocalStorage(ctx context.Context, c *cfg) {
ls := engine.New(c.engineOpts()...) ls := engine.New(c.engineOpts()...)
addNewEpochAsyncNotificationHandler(c, func(ev event.Event) { addNewEpochAsyncNotificationHandler(c, func(ev event.Event) {
ls.HandleNewEpoch(ev.(netmap2.NewEpoch).EpochNumber()) ls.HandleNewEpoch(ctx, ev.(netmap2.NewEpoch).EpochNumber())
}) })
// allocate memory for the service; // allocate memory for the service;

View file

@ -270,7 +270,9 @@ const (
ShardCantOpenMetabaseMoveToADegradedMode = "can't open metabase, move to a degraded mode" ShardCantOpenMetabaseMoveToADegradedMode = "can't open metabase, move to a degraded mode"
ShardCantInitializeMetabaseMoveToADegradedreadonlyMode = "can't initialize metabase, move to a degraded-read-only mode" ShardCantInitializeMetabaseMoveToADegradedreadonlyMode = "can't initialize metabase, move to a degraded-read-only mode"
ShardTryingToRestoreReadwriteMode = "trying to restore read-write mode" ShardTryingToRestoreReadwriteMode = "trying to restore read-write mode"
ShardStopEventListenerByClosedChannel = "stop event listener by closed channel" ShardStopEventListenerByClosedEventChannel = "stop event listener by closed `event` channel"
ShardStopEventListenerByClosedStopChannel = "stop event listener by closed `stop` channel"
ShardStopEventListenerByContext = "stop event listener by context"
ShardCouldNotSubmitGCJobToWorkerPool = "could not submit GC job to worker pool" ShardCouldNotSubmitGCJobToWorkerPool = "could not submit GC job to worker pool"
ShardGCIsStopped = "GC is stopped" ShardGCIsStopped = "GC is stopped"
ShardWaitingForGCWorkersToStop = "waiting for GC workers to stop..." ShardWaitingForGCWorkersToStop = "waiting for GC workers to stop..."

View file

@ -133,7 +133,7 @@ func TestLockUserScenario(t *testing.T) {
require.ErrorIs(t, err, meta.ErrLockObjectRemoval) require.ErrorIs(t, err, meta.ErrLockObjectRemoval)
// 5. // 5.
e.HandleNewEpoch(lockerExpiresAfter + 1) e.HandleNewEpoch(context.Background(), lockerExpiresAfter+1)
inhumePrm.WithTarget(tombAddr, objAddr) inhumePrm.WithTarget(tombAddr, objAddr)
@ -206,7 +206,7 @@ func TestLockExpiration(t *testing.T) {
require.ErrorAs(t, err, &objLockedErr) require.ErrorAs(t, err, &objLockedErr)
// 3. // 3.
e.HandleNewEpoch(lockerExpiresAfter + 1) e.HandleNewEpoch(context.Background(), lockerExpiresAfter+1)
// 4. // 4.
inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj)) inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj))

View file

@ -329,14 +329,18 @@ func (e *StorageEngine) SetShardMode(id *shard.ID, m mode.Mode, resetErrorCounte
} }
// HandleNewEpoch notifies every shard about NewEpoch event. // HandleNewEpoch notifies every shard about NewEpoch event.
func (e *StorageEngine) HandleNewEpoch(epoch uint64) { func (e *StorageEngine) HandleNewEpoch(ctx context.Context, epoch uint64) {
ev := shard.EventNewEpoch(epoch) ev := shard.EventNewEpoch(epoch)
e.mtx.RLock() e.mtx.RLock()
defer e.mtx.RUnlock() defer e.mtx.RUnlock()
for _, sh := range e.shards { for _, sh := range e.shards {
sh.NotificationChannel() <- ev select {
case <-ctx.Done():
return
case sh.NotificationChannel() <- ev:
}
} }
Review

Shouldn't it be select with both this and <-ctx.Done() branches?
A separate select for ctx.Done() is ok, because this way we guarantee early exit, though a single select is not much worse.

Shouldn't it be `select` with both this and `<-ctx.Done()` branches? A separate `select` for `ctx.Done()` is ok, because this way we guarantee early exit, though a single select is not much worse.
Review

Right, the idea was to exit earlier. And it looks like once NotificationChannel is buffered, we can use one select for this too statements.

Right, the idea was to exit earlier. And it looks like once `NotificationChannel` is buffered, we can use one `select` for this too statements.
} }

View file

@ -105,6 +105,8 @@ type gc struct {
remover func(context.Context) gcRunResult remover func(context.Context) gcRunResult
// eventChan is used only for listening for the new epoch event.
// It is ok to keep opened, we are listening for context done when writing in it.
eventChan chan Event eventChan chan Event
mEventHandler map[eventType]*eventHandlers mEventHandler map[eventType]*eventHandlers
} }
@ -155,15 +157,23 @@ func (gc *gc) listenEvents(ctx context.Context) {
defer gc.wg.Done() defer gc.wg.Done()
for { for {
event, ok := <-gc.eventChan select {
case <-gc.stopChannel:
gc.log.Warn(logs.ShardStopEventListenerByClosedStopChannel)
return
case <-ctx.Done():
gc.log.Warn(logs.ShardStopEventListenerByContext)
return
case event, ok := <-gc.eventChan:
if !ok { if !ok {
gc.log.Warn(logs.ShardStopEventListenerByClosedChannel) gc.log.Warn(logs.ShardStopEventListenerByClosedEventChannel)
return return
} }
gc.handleEvent(ctx, event) gc.handleEvent(ctx, event)
} }
} }
}
func (gc *gc) handleEvent(ctx context.Context, event Event) { func (gc *gc) handleEvent(ctx context.Context, event Event) {
v, ok := gc.mEventHandler[event.typ()] v, ok := gc.mEventHandler[event.typ()]
@ -180,6 +190,11 @@ func (gc *gc) handleEvent(ctx context.Context, event Event) {
v.prevGroup.Add(len(v.handlers)) v.prevGroup.Add(len(v.handlers))
for i := range v.handlers { for i := range v.handlers {
select {
case <-ctx.Done():
return
default:
}
h := v.handlers[i] h := v.handlers[i]
err := gc.workerPool.Submit(func() { err := gc.workerPool.Submit(func() {
@ -196,6 +211,18 @@ func (gc *gc) handleEvent(ctx context.Context, event Event) {
} }
} }
func (gc *gc) releaseResources() {
if gc.workerPool != nil {
gc.workerPool.Release()
}
// Avoid to close gc.eventChan here,
// because it is possible that we are close it earlier than stop writing.
// It is ok to keep it opened.
gc.log.Debug(logs.ShardGCIsStopped)
}
func (gc *gc) tickRemover(ctx context.Context) { func (gc *gc) tickRemover(ctx context.Context) {
defer gc.wg.Done() defer gc.wg.Done()
@ -204,14 +231,13 @@ func (gc *gc) tickRemover(ctx context.Context) {
for { for {
select { select {
case <-ctx.Done():
// Context canceled earlier than we start to close shards.
// It make sense to stop collecting garbage by context too.
gc.releaseResources()
return
case <-gc.stopChannel: case <-gc.stopChannel:
if gc.workerPool != nil { gc.releaseResources()
gc.workerPool.Release()
}
close(gc.eventChan)
gc.log.Debug(logs.ShardGCIsStopped)
return return
case <-timer.C: case <-timer.C:
startedAt := time.Now() startedAt := time.Now()

View file

@ -19,6 +19,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
p.taskPool.Release()
return return
default: default:
} }
@ -36,6 +37,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
for i := range addrs { for i := range addrs {
select { select {
case <-ctx.Done(): case <-ctx.Done():
p.taskPool.Release()
return return
default: default:
addr := addrs[i] addr := addrs[i]