Compare commits
2 commits
master
...
bugfix/imp
Author | SHA1 | Date | |
---|---|---|---|
1f2c34318f | |||
a1625b614e |
6 changed files with 52 additions and 18 deletions
|
@ -1009,7 +1009,7 @@ func initLocalStorage(ctx context.Context, c *cfg) {
|
|||
ls := engine.New(c.engineOpts()...)
|
||||
|
||||
addNewEpochAsyncNotificationHandler(c, func(ev event.Event) {
|
||||
ls.HandleNewEpoch(ev.(netmap2.NewEpoch).EpochNumber())
|
||||
ls.HandleNewEpoch(ctx, ev.(netmap2.NewEpoch).EpochNumber())
|
||||
})
|
||||
|
||||
// allocate memory for the service;
|
||||
|
|
|
@ -270,7 +270,9 @@ const (
|
|||
ShardCantOpenMetabaseMoveToADegradedMode = "can't open metabase, move to a degraded mode"
|
||||
ShardCantInitializeMetabaseMoveToADegradedreadonlyMode = "can't initialize metabase, move to a degraded-read-only mode"
|
||||
ShardTryingToRestoreReadwriteMode = "trying to restore read-write mode"
|
||||
ShardStopEventListenerByClosedChannel = "stop event listener by closed channel"
|
||||
ShardStopEventListenerByClosedEventChannel = "stop event listener by closed `event` channel"
|
||||
ShardStopEventListenerByClosedStopChannel = "stop event listener by closed `stop` channel"
|
||||
ShardStopEventListenerByContext = "stop event listener by context"
|
||||
ShardCouldNotSubmitGCJobToWorkerPool = "could not submit GC job to worker pool"
|
||||
ShardGCIsStopped = "GC is stopped"
|
||||
ShardWaitingForGCWorkersToStop = "waiting for GC workers to stop..."
|
||||
|
|
|
@ -133,7 +133,7 @@ func TestLockUserScenario(t *testing.T) {
|
|||
require.ErrorIs(t, err, meta.ErrLockObjectRemoval)
|
||||
|
||||
// 5.
|
||||
e.HandleNewEpoch(lockerExpiresAfter + 1)
|
||||
e.HandleNewEpoch(context.Background(), lockerExpiresAfter+1)
|
||||
|
||||
inhumePrm.WithTarget(tombAddr, objAddr)
|
||||
|
||||
|
@ -206,7 +206,7 @@ func TestLockExpiration(t *testing.T) {
|
|||
require.ErrorAs(t, err, &objLockedErr)
|
||||
|
||||
// 3.
|
||||
e.HandleNewEpoch(lockerExpiresAfter + 1)
|
||||
e.HandleNewEpoch(context.Background(), lockerExpiresAfter+1)
|
||||
|
||||
// 4.
|
||||
inhumePrm.WithTarget(oidtest.Address(), objectcore.AddressOf(obj))
|
||||
|
|
|
@ -329,14 +329,18 @@ func (e *StorageEngine) SetShardMode(id *shard.ID, m mode.Mode, resetErrorCounte
|
|||
}
|
||||
|
||||
// HandleNewEpoch notifies every shard about NewEpoch event.
|
||||
func (e *StorageEngine) HandleNewEpoch(epoch uint64) {
|
||||
func (e *StorageEngine) HandleNewEpoch(ctx context.Context, epoch uint64) {
|
||||
ev := shard.EventNewEpoch(epoch)
|
||||
|
||||
e.mtx.RLock()
|
||||
defer e.mtx.RUnlock()
|
||||
|
||||
for _, sh := range e.shards {
|
||||
sh.NotificationChannel() <- ev
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case sh.NotificationChannel() <- ev:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -105,6 +105,8 @@ type gc struct {
|
|||
|
||||
remover func(context.Context) gcRunResult
|
||||
|
||||
// eventChan is used only for listening for the new epoch event.
|
||||
// It is ok to keep opened, we are listening for context done when writing in it.
|
||||
eventChan chan Event
|
||||
mEventHandler map[eventType]*eventHandlers
|
||||
}
|
||||
|
@ -155,13 +157,21 @@ func (gc *gc) listenEvents(ctx context.Context) {
|
|||
defer gc.wg.Done()
|
||||
|
||||
for {
|
||||
event, ok := <-gc.eventChan
|
||||
if !ok {
|
||||
gc.log.Warn(logs.ShardStopEventListenerByClosedChannel)
|
||||
select {
|
||||
case <-gc.stopChannel:
|
||||
gc.log.Warn(logs.ShardStopEventListenerByClosedStopChannel)
|
||||
return
|
||||
}
|
||||
case <-ctx.Done():
|
||||
gc.log.Warn(logs.ShardStopEventListenerByContext)
|
||||
return
|
||||
case event, ok := <-gc.eventChan:
|
||||
if !ok {
|
||||
gc.log.Warn(logs.ShardStopEventListenerByClosedEventChannel)
|
||||
return
|
||||
}
|
||||
|
||||
gc.handleEvent(ctx, event)
|
||||
gc.handleEvent(ctx, event)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -180,6 +190,11 @@ func (gc *gc) handleEvent(ctx context.Context, event Event) {
|
|||
v.prevGroup.Add(len(v.handlers))
|
||||
|
||||
for i := range v.handlers {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
h := v.handlers[i]
|
||||
|
||||
err := gc.workerPool.Submit(func() {
|
||||
|
@ -196,6 +211,18 @@ func (gc *gc) handleEvent(ctx context.Context, event Event) {
|
|||
}
|
||||
}
|
||||
|
||||
func (gc *gc) releaseResources() {
|
||||
if gc.workerPool != nil {
|
||||
gc.workerPool.Release()
|
||||
}
|
||||
|
||||
// Avoid to close gc.eventChan here,
|
||||
// because it is possible that we are close it earlier than stop writing.
|
||||
// It is ok to keep it opened.
|
||||
|
||||
gc.log.Debug(logs.ShardGCIsStopped)
|
||||
}
|
||||
|
||||
func (gc *gc) tickRemover(ctx context.Context) {
|
||||
defer gc.wg.Done()
|
||||
|
||||
|
@ -204,14 +231,13 @@ func (gc *gc) tickRemover(ctx context.Context) {
|
|||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// Context canceled earlier than we start to close shards.
|
||||
// It make sense to stop collecting garbage by context too.
|
||||
gc.releaseResources()
|
||||
return
|
||||
case <-gc.stopChannel:
|
||||
if gc.workerPool != nil {
|
||||
gc.workerPool.Release()
|
||||
}
|
||||
|
||||
close(gc.eventChan)
|
||||
|
||||
gc.log.Debug(logs.ShardGCIsStopped)
|
||||
gc.releaseResources()
|
||||
return
|
||||
case <-timer.C:
|
||||
startedAt := time.Now()
|
||||
|
|
|
@ -19,6 +19,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
|
|||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
p.taskPool.Release()
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
@ -36,6 +37,7 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
|
|||
for i := range addrs {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
p.taskPool.Release()
|
||||
return
|
||||
default:
|
||||
addr := addrs[i]
|
||||
|
|
Loading…
Reference in a new issue