Compare commits

...

2 commits

Author SHA1 Message Date
1f2c34318f [#930] gc: Stop internal activity by context
All checks were successful
DCO action / DCO (pull_request) Successful in 3m45s
Build / Build Components (1.21) (pull_request) Successful in 4m4s
Build / Build Components (1.20) (pull_request) Successful in 4m10s
Vulncheck / Vulncheck (pull_request) Successful in 10m13s
Tests and linters / Tests (1.21) (pull_request) Successful in 10m40s
Tests and linters / Staticcheck (pull_request) Successful in 14m42s
Tests and linters / Lint (pull_request) Successful in 15m26s
Tests and linters / Tests (1.20) (pull_request) Successful in 2m49s
Tests and linters / Tests with -race (pull_request) Successful in 4m49s
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2024-01-30 10:14:00 +03:00
a1625b614e [#930] policer: Release task pool when context cancelled
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2024-01-30 10:14:00 +03:00
6 changed files with 52 additions and 18 deletions

View file

@ -1009,7 +1009,7 @@ func initLocalStorage(ctx context.Context, c *cfg) {
ls := engine.New(c.engineOpts()...)
addNewEpochAsyncNotificationHandler(c, func(ev event.Event) {
ls.HandleNewEpoch(ev.(netmap2.NewEpoch).EpochNumber())
ls.HandleNewEpoch(ctx, ev.(netmap2.NewEpoch).EpochNumber())
})
// allocate memory for the service;

View file

@ -270,7 +270,9 @@ const (
ShardCantOpenMetabaseMoveToADegradedMode = "can't open metabase, move to a degraded mode"
ShardCantInitializeMetabaseMoveToADegradedreadonlyMode = "can't initialize metabase, move to a degraded-read-only mode"
ShardTryingToRestoreReadwriteMode = "trying to restore read-write mode"
ShardStopEventListenerByClosedChannel = "stop event listener by closed channel"
ShardStopEventListenerByClosedEventChannel = "stop event listener by closed `event` channel"
ShardStopEventListenerByClosedStopChannel = "stop event listener by closed `stop` channel"
ShardStopEventListenerByContext = "stop event listener by context"
ShardCouldNotSubmitGCJobToWorkerPool = "could not submit GC job to worker pool"
ShardGCIsStopped = "GC is stopped"
ShardWaitingForGCWorkersToStop = "waiting for GC workers to stop..."

View file

@ -133,7 +133,7 @@ func TestLockUserScenario(t *testing.T) {
require.ErrorIs(t, err, meta.ErrLockObjectRemoval)
// 5.
e.HandleNewEpoch(lockerExpiresAfter + 1)
e.HandleNewEpoch(context.Background(), lockerExpiresAfter+1)
inhumePrm.WithTarget(tombAddr, objAddr)
@ -206,7 +206,7 @@ func TestLockExpiration(t *testing.T) {
require.ErrorAs(t, err, &objLockedErr)
// 3.
e.HandleNewEpoch(lockerExpiresAfter + 1)
e.HandleNewEpoch(context.Background(), lockerExpiresAfter+1)
// 4.
inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj))

View file

@ -329,14 +329,18 @@ func (e *StorageEngine) SetShardMode(id *shard.ID, m mode.Mode, resetErrorCounte
}
// HandleNewEpoch notifies every shard about NewEpoch event.
func (e *StorageEngine) HandleNewEpoch(epoch uint64) {
func (e *StorageEngine) HandleNewEpoch(ctx context.Context, epoch uint64) {
ev := shard.EventNewEpoch(epoch)
e.mtx.RLock()
defer e.mtx.RUnlock()
for _, sh := range e.shards {
sh.NotificationChannel() <- ev
select {
case <-ctx.Done():
return
case sh.NotificationChannel() <- ev:
}
}
}

View file

@ -105,6 +105,8 @@ type gc struct {
remover func(context.Context) gcRunResult
// eventChan is used only for listening for the new epoch event.
// It is ok to keep opened, we are listening for context done when writing in it.
eventChan chan Event
mEventHandler map[eventType]*eventHandlers
}
@ -155,13 +157,21 @@ func (gc *gc) listenEvents(ctx context.Context) {
defer gc.wg.Done()
for {
event, ok := <-gc.eventChan
if !ok {
gc.log.Warn(logs.ShardStopEventListenerByClosedChannel)
select {
case <-gc.stopChannel:
gc.log.Warn(logs.ShardStopEventListenerByClosedStopChannel)
return
}
case <-ctx.Done():
gc.log.Warn(logs.ShardStopEventListenerByContext)
return
case event, ok := <-gc.eventChan:
if !ok {
gc.log.Warn(logs.ShardStopEventListenerByClosedEventChannel)
return
}
gc.handleEvent(ctx, event)
gc.handleEvent(ctx, event)
}
}
}
@ -180,6 +190,11 @@ func (gc *gc) handleEvent(ctx context.Context, event Event) {
v.prevGroup.Add(len(v.handlers))
for i := range v.handlers {
select {
case <-ctx.Done():
return
default:
}
h := v.handlers[i]
err := gc.workerPool.Submit(func() {
@ -196,6 +211,18 @@ func (gc *gc) handleEvent(ctx context.Context, event Event) {
}
}
func (gc *gc) releaseResources() {
if gc.workerPool != nil {
gc.workerPool.Release()
}
// Avoid to close gc.eventChan here,
// because it is possible that we are close it earlier than stop writing.
// It is ok to keep it opened.
gc.log.Debug(logs.ShardGCIsStopped)
}
func (gc *gc) tickRemover(ctx context.Context) {
defer gc.wg.Done()
@ -204,14 +231,13 @@ func (gc *gc) tickRemover(ctx context.Context) {
for {
select {
case <-ctx.Done():
// Context canceled earlier than we start to close shards.
// It make sense to stop collecting garbage by context too.
gc.releaseResources()
return
case <-gc.stopChannel:
if gc.workerPool != nil {
gc.workerPool.Release()
}
close(gc.eventChan)
gc.log.Debug(logs.ShardGCIsStopped)
gc.releaseResources()
return
case <-timer.C:
startedAt := time.Now()

View file

@ -19,6 +19,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
for {
select {
case <-ctx.Done():
p.taskPool.Release()
return
default:
}
@ -36,6 +37,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
for i := range addrs {
select {
case <-ctx.Done():
p.taskPool.Release()
return
default:
addr := addrs[i]