forked from TrueCloudLab/frostfs-node
[#1228] gc: Skip received event if previous still processing
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
parent
3bf6e6dde6
commit
aeafa97f5a
3 changed files with 29 additions and 19 deletions
|
@ -251,8 +251,10 @@ const (
|
|||
ShardCantOpenMetabaseMoveToADegradedMode = "can't open metabase, move to a degraded mode"
|
||||
ShardCantInitializeMetabaseMoveToADegradedreadonlyMode = "can't initialize metabase, move to a degraded-read-only mode"
|
||||
ShardStopEventListenerByClosedEventChannel = "stop event listener by closed `event` channel"
|
||||
ShardStopEventListenerByClosedStopChannel = "stop event listener by closed `stop` channel"
|
||||
ShardEventProcessingInProgress = "event processing is in progress, skip the received"
|
||||
ShardEventProcessingStart = "event processing is started"
|
||||
ShardEventProcessingEnd = "event processing is completed"
|
||||
ShardStopEventListenerByClosedStopChannel = "stop event listener by closed `stop` channel"
|
||||
ShardStopEventListenerByContext = "stop event listener by context"
|
||||
ShardCouldNotSubmitGCJobToWorkerPool = "could not submit GC job to worker pool"
|
||||
ShardGCIsStopped = "GC is stopped"
|
||||
|
|
|
@ -158,7 +158,6 @@ func (s *Shard) Init(ctx context.Context) error {
|
|||
eventChan: make(chan Event),
|
||||
mEventHandler: map[eventType]*eventHandlers{
|
||||
eventNewEpoch: {
|
||||
cancelFunc: func() {},
|
||||
handlers: []eventHandler{
|
||||
s.collectExpiredLocks,
|
||||
s.collectExpiredObjects,
|
||||
|
|
|
@ -3,6 +3,7 @@ package shard
|
|||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
|
@ -61,11 +62,11 @@ func EventNewEpoch(e uint64) Event {
|
|||
type eventHandler func(context.Context, Event)
|
||||
|
||||
type eventHandlers struct {
|
||||
prevGroup sync.WaitGroup
|
||||
|
||||
cancelFunc context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
|
||||
handlers []eventHandler
|
||||
|
||||
runningHandlers atomic.Int32
|
||||
}
|
||||
|
||||
type gcRunResult struct {
|
||||
|
@ -182,33 +183,36 @@ func (gc *gc) handleEvent(ctx context.Context, event Event) {
|
|||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
v.cancelFunc()
|
||||
v.prevGroup.Wait()
|
||||
|
||||
var runCtx context.Context
|
||||
runCtx, v.cancelFunc = context.WithCancel(ctx)
|
||||
|
||||
v.prevGroup.Add(len(v.handlers))
|
||||
|
||||
if !v.runningHandlers.CompareAndSwap(0, int32(len(v.handlers))) {
|
||||
gc.log.Warn(logs.ShardEventProcessingInProgress)
|
||||
return
|
||||
}
|
||||
addAndLog := func(delta int32) {
|
||||
if v.runningHandlers.Add(delta) == 0 {
|
||||
gc.log.Debug(logs.ShardEventProcessingEnd)
|
||||
}
|
||||
}
|
||||
gc.log.Debug(logs.ShardEventProcessingStart)
|
||||
for i := range v.handlers {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
addAndLog(int32(i - len(v.handlers)))
|
||||
return
|
||||
default:
|
||||
}
|
||||
h := v.handlers[i]
|
||||
|
||||
v.wg.Add(1)
|
||||
err := gc.workerPool.Submit(func() {
|
||||
defer v.prevGroup.Done()
|
||||
h(runCtx, event)
|
||||
h(ctx, event)
|
||||
addAndLog(-1)
|
||||
v.wg.Done()
|
||||
})
|
||||
if err != nil {
|
||||
gc.log.Warn(logs.ShardCouldNotSubmitGCJobToWorkerPool,
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
v.prevGroup.Done()
|
||||
addAndLog(-1)
|
||||
v.wg.Done()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -264,7 +268,12 @@ func (gc *gc) stop() {
|
|||
})
|
||||
|
||||
gc.log.Info(logs.ShardWaitingForGCWorkersToStop)
|
||||
// Order is important here, at first we need to wait for listener ends,
|
||||
// and then for handlers, which spawn inside the listener.
|
||||
gc.wg.Wait()
|
||||
for _, h := range gc.mEventHandler {
|
||||
h.wg.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
// iterates over metabase and deletes objects
|
||||
|
|
Loading…
Reference in a new issue