node: Improve shutdown #930
6 changed files with 52 additions and 18 deletions
|
@ -1009,7 +1009,7 @@ func initLocalStorage(ctx context.Context, c *cfg) {
|
||||||
ls := engine.New(c.engineOpts()...)
|
ls := engine.New(c.engineOpts()...)
|
||||||
|
|
||||||
addNewEpochAsyncNotificationHandler(c, func(ev event.Event) {
|
addNewEpochAsyncNotificationHandler(c, func(ev event.Event) {
|
||||||
ls.HandleNewEpoch(ev.(netmap2.NewEpoch).EpochNumber())
|
ls.HandleNewEpoch(ctx, ev.(netmap2.NewEpoch).EpochNumber())
|
||||||
})
|
})
|
||||||
|
|
||||||
// allocate memory for the service;
|
// allocate memory for the service;
|
||||||
|
|
|
@ -270,7 +270,9 @@ const (
|
||||||
ShardCantOpenMetabaseMoveToADegradedMode = "can't open metabase, move to a degraded mode"
|
ShardCantOpenMetabaseMoveToADegradedMode = "can't open metabase, move to a degraded mode"
|
||||||
ShardCantInitializeMetabaseMoveToADegradedreadonlyMode = "can't initialize metabase, move to a degraded-read-only mode"
|
ShardCantInitializeMetabaseMoveToADegradedreadonlyMode = "can't initialize metabase, move to a degraded-read-only mode"
|
||||||
ShardTryingToRestoreReadwriteMode = "trying to restore read-write mode"
|
ShardTryingToRestoreReadwriteMode = "trying to restore read-write mode"
|
||||||
ShardStopEventListenerByClosedChannel = "stop event listener by closed channel"
|
ShardStopEventListenerByClosedEventChannel = "stop event listener by closed `event` channel"
|
||||||
|
ShardStopEventListenerByClosedStopChannel = "stop event listener by closed `stop` channel"
|
||||||
|
ShardStopEventListenerByContext = "stop event listener by context"
|
||||||
ShardCouldNotSubmitGCJobToWorkerPool = "could not submit GC job to worker pool"
|
ShardCouldNotSubmitGCJobToWorkerPool = "could not submit GC job to worker pool"
|
||||||
ShardGCIsStopped = "GC is stopped"
|
ShardGCIsStopped = "GC is stopped"
|
||||||
ShardWaitingForGCWorkersToStop = "waiting for GC workers to stop..."
|
ShardWaitingForGCWorkersToStop = "waiting for GC workers to stop..."
|
||||||
|
|
|
@ -133,7 +133,7 @@ func TestLockUserScenario(t *testing.T) {
|
||||||
require.ErrorIs(t, err, meta.ErrLockObjectRemoval)
|
require.ErrorIs(t, err, meta.ErrLockObjectRemoval)
|
||||||
|
|
||||||
// 5.
|
// 5.
|
||||||
e.HandleNewEpoch(lockerExpiresAfter + 1)
|
e.HandleNewEpoch(context.Background(), lockerExpiresAfter+1)
|
||||||
|
|
||||||
inhumePrm.WithTarget(tombAddr, objAddr)
|
inhumePrm.WithTarget(tombAddr, objAddr)
|
||||||
|
|
||||||
|
@ -206,7 +206,7 @@ func TestLockExpiration(t *testing.T) {
|
||||||
require.ErrorAs(t, err, &objLockedErr)
|
require.ErrorAs(t, err, &objLockedErr)
|
||||||
|
|
||||||
// 3.
|
// 3.
|
||||||
e.HandleNewEpoch(lockerExpiresAfter + 1)
|
e.HandleNewEpoch(context.Background(), lockerExpiresAfter+1)
|
||||||
|
|
||||||
// 4.
|
// 4.
|
||||||
inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj))
|
inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj))
|
||||||
|
|
|
@ -329,14 +329,18 @@ func (e *StorageEngine) SetShardMode(id *shard.ID, m mode.Mode, resetErrorCounte
|
||||||
}
|
}
|
||||||
|
|
||||||
// HandleNewEpoch notifies every shard about NewEpoch event.
|
// HandleNewEpoch notifies every shard about NewEpoch event.
|
||||||
func (e *StorageEngine) HandleNewEpoch(epoch uint64) {
|
func (e *StorageEngine) HandleNewEpoch(ctx context.Context, epoch uint64) {
|
||||||
ev := shard.EventNewEpoch(epoch)
|
ev := shard.EventNewEpoch(epoch)
|
||||||
|
|
||||||
e.mtx.RLock()
|
e.mtx.RLock()
|
||||||
defer e.mtx.RUnlock()
|
defer e.mtx.RUnlock()
|
||||||
|
|
||||||
for _, sh := range e.shards {
|
for _, sh := range e.shards {
|
||||||
sh.NotificationChannel() <- ev
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case sh.NotificationChannel() <- ev:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -105,6 +105,8 @@ type gc struct {
|
||||||
|
|
||||||
remover func(context.Context) gcRunResult
|
remover func(context.Context) gcRunResult
|
||||||
|
|
||||||
|
// eventChan is used only for listening for the new epoch event.
|
||||||
|
// It is ok to keep opened, we are listening for context done when writing in it.
|
||||||
eventChan chan Event
|
eventChan chan Event
|
||||||
mEventHandler map[eventType]*eventHandlers
|
mEventHandler map[eventType]*eventHandlers
|
||||||
}
|
}
|
||||||
|
@ -155,13 +157,21 @@ func (gc *gc) listenEvents(ctx context.Context) {
|
||||||
defer gc.wg.Done()
|
defer gc.wg.Done()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
event, ok := <-gc.eventChan
|
select {
|
||||||
if !ok {
|
case <-gc.stopChannel:
|
||||||
gc.log.Warn(logs.ShardStopEventListenerByClosedChannel)
|
gc.log.Warn(logs.ShardStopEventListenerByClosedStopChannel)
|
||||||
return
|
return
|
||||||
}
|
case <-ctx.Done():
|
||||||
|
gc.log.Warn(logs.ShardStopEventListenerByContext)
|
||||||
|
return
|
||||||
|
case event, ok := <-gc.eventChan:
|
||||||
|
if !ok {
|
||||||
|
gc.log.Warn(logs.ShardStopEventListenerByClosedEventChannel)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
gc.handleEvent(ctx, event)
|
gc.handleEvent(ctx, event)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -180,6 +190,11 @@ func (gc *gc) handleEvent(ctx context.Context, event Event) {
|
||||||
v.prevGroup.Add(len(v.handlers))
|
v.prevGroup.Add(len(v.handlers))
|
||||||
|
|
||||||
for i := range v.handlers {
|
for i := range v.handlers {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
h := v.handlers[i]
|
h := v.handlers[i]
|
||||||
|
|
||||||
err := gc.workerPool.Submit(func() {
|
err := gc.workerPool.Submit(func() {
|
||||||
|
@ -196,6 +211,18 @@ func (gc *gc) handleEvent(ctx context.Context, event Event) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (gc *gc) releaseResources() {
|
||||||
|
if gc.workerPool != nil {
|
||||||
|
gc.workerPool.Release()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Avoid to close gc.eventChan here,
|
||||||
|
// because it is possible that we are close it earlier than stop writing.
|
||||||
|
// It is ok to keep it opened.
|
||||||
|
|
||||||
|
gc.log.Debug(logs.ShardGCIsStopped)
|
||||||
|
}
|
||||||
|
|
||||||
func (gc *gc) tickRemover(ctx context.Context) {
|
func (gc *gc) tickRemover(ctx context.Context) {
|
||||||
defer gc.wg.Done()
|
defer gc.wg.Done()
|
||||||
|
|
||||||
|
@ -204,14 +231,13 @@ func (gc *gc) tickRemover(ctx context.Context) {
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
// Context canceled earlier than we start to close shards.
|
||||||
|
// It make sense to stop collecting garbage by context too.
|
||||||
|
gc.releaseResources()
|
||||||
|
return
|
||||||
case <-gc.stopChannel:
|
case <-gc.stopChannel:
|
||||||
if gc.workerPool != nil {
|
gc.releaseResources()
|
||||||
gc.workerPool.Release()
|
|
||||||
}
|
|
||||||
|
|
||||||
close(gc.eventChan)
|
|
||||||
|
|
||||||
gc.log.Debug(logs.ShardGCIsStopped)
|
|
||||||
return
|
return
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
startedAt := time.Now()
|
startedAt := time.Now()
|
||||||
|
|
|
@ -19,6 +19,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
p.taskPool.Release()
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
@ -36,6 +37,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
|
||||||
for i := range addrs {
|
for i := range addrs {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
p.taskPool.Release()
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
addr := addrs[i]
|
addr := addrs[i]
|
||||||
|
|
Loading…
Reference in a new issue
Shouldn't it be
select
with both this and<-ctx.Done()
branches?A separate
select
forctx.Done()
is ok, because this way we guarantee early exit, though a single select is not much worse.Right, the idea was to exit earlier. And it looks like once
NotificationChannel
is buffered, we can use oneselect
for this too statements.