gc: Skip received event if previous still processing #1228

Closed
acid-ant wants to merge 1 commit from acid-ant/frostfs-node:bugfix/gc-unblock-evnt-handler into master
3 changed files with 29 additions and 19 deletions

View file

@ -251,8 +251,10 @@ const (
ShardCantOpenMetabaseMoveToADegradedMode = "can't open metabase, move to a degraded mode" ShardCantOpenMetabaseMoveToADegradedMode = "can't open metabase, move to a degraded mode"
ShardCantInitializeMetabaseMoveToADegradedreadonlyMode = "can't initialize metabase, move to a degraded-read-only mode" ShardCantInitializeMetabaseMoveToADegradedreadonlyMode = "can't initialize metabase, move to a degraded-read-only mode"
ShardStopEventListenerByClosedEventChannel = "stop event listener by closed `event` channel" ShardStopEventListenerByClosedEventChannel = "stop event listener by closed `event` channel"
ShardStopEventListenerByClosedStopChannel = "stop event listener by closed `stop` channel"
ShardEventProcessingInProgress = "event processing is in progress, skip the received" ShardEventProcessingInProgress = "event processing is in progress, skip the received"
ShardEventProcessingStart = "event processing is started"
ShardEventProcessingEnd = "event processing is completed"
ShardStopEventListenerByClosedStopChannel = "stop event listener by closed `stop` channel"
ShardStopEventListenerByContext = "stop event listener by context" ShardStopEventListenerByContext = "stop event listener by context"
ShardCouldNotSubmitGCJobToWorkerPool = "could not submit GC job to worker pool" ShardCouldNotSubmitGCJobToWorkerPool = "could not submit GC job to worker pool"
ShardGCIsStopped = "GC is stopped" ShardGCIsStopped = "GC is stopped"

View file

@ -158,7 +158,6 @@ func (s *Shard) Init(ctx context.Context) error {
eventChan: make(chan Event), eventChan: make(chan Event),
mEventHandler: map[eventType]*eventHandlers{ mEventHandler: map[eventType]*eventHandlers{
eventNewEpoch: { eventNewEpoch: {
cancelFunc: func() {},
handlers: []eventHandler{ handlers: []eventHandler{
s.collectExpiredLocks, s.collectExpiredLocks,
s.collectExpiredObjects, s.collectExpiredObjects,

View file

@ -3,6 +3,7 @@ package shard
import ( import (
"context" "context"
"sync" "sync"
"sync/atomic"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -61,11 +62,11 @@ func EventNewEpoch(e uint64) Event {
type eventHandler func(context.Context, Event) type eventHandler func(context.Context, Event)
type eventHandlers struct { type eventHandlers struct {
prevGroup sync.WaitGroup wg sync.WaitGroup
cancelFunc context.CancelFunc
handlers []eventHandler handlers []eventHandler
runningHandlers atomic.Int32
} }
type gcRunResult struct { type gcRunResult struct {
@ -182,33 +183,36 @@ func (gc *gc) handleEvent(ctx context.Context, event Event) {
if !ok { if !ok {
return return
} }
if !v.runningHandlers.CompareAndSwap(0, int32(len(v.handlers))) {
v.cancelFunc() gc.log.Warn(logs.ShardEventProcessingInProgress)
v.prevGroup.Wait() return
}
var runCtx context.Context addAndLog := func(delta int32) {
runCtx, v.cancelFunc = context.WithCancel(ctx) if v.runningHandlers.Add(delta) == 0 {
gc.log.Debug(logs.ShardEventProcessingEnd)
v.prevGroup.Add(len(v.handlers)) }
}
gc.log.Debug(logs.ShardEventProcessingStart)
for i := range v.handlers { for i := range v.handlers {
select { select {
case <-ctx.Done(): case <-ctx.Done():
addAndLog(int32(i - len(v.handlers)))
return return
default: default:
} }
h := v.handlers[i] h := v.handlers[i]
v.wg.Add(1)
err := gc.workerPool.Submit(func() { err := gc.workerPool.Submit(func() {
defer v.prevGroup.Done() h(ctx, event)
h(runCtx, event) addAndLog(-1)
v.wg.Done()
}) })
if err != nil { if err != nil {
gc.log.Warn(logs.ShardCouldNotSubmitGCJobToWorkerPool, gc.log.Warn(logs.ShardCouldNotSubmitGCJobToWorkerPool,
zap.String("error", err.Error()), zap.String("error", err.Error()),
) )
addAndLog(-1)
v.prevGroup.Done() v.wg.Done()
} }
} }
} }
@ -264,7 +268,12 @@ func (gc *gc) stop() {
}) })
gc.log.Info(logs.ShardWaitingForGCWorkersToStop) gc.log.Info(logs.ShardWaitingForGCWorkersToStop)
// Order is important here, at first we need to wait for listener ends,
// and then for handlers, which spawn inside the listener.
gc.wg.Wait() gc.wg.Wait()
for _, h := range gc.mEventHandler {
h.wg.Wait()
}
} }
// iterates over metabase and deletes objects // iterates over metabase and deletes objects