gc: Skip received event if previous still processing #1228
3 changed files with 29 additions and 19 deletions
|
@ -251,8 +251,10 @@ const (
|
||||||
ShardCantOpenMetabaseMoveToADegradedMode = "can't open metabase, move to a degraded mode"
|
ShardCantOpenMetabaseMoveToADegradedMode = "can't open metabase, move to a degraded mode"
|
||||||
ShardCantInitializeMetabaseMoveToADegradedreadonlyMode = "can't initialize metabase, move to a degraded-read-only mode"
|
ShardCantInitializeMetabaseMoveToADegradedreadonlyMode = "can't initialize metabase, move to a degraded-read-only mode"
|
||||||
ShardStopEventListenerByClosedEventChannel = "stop event listener by closed `event` channel"
|
ShardStopEventListenerByClosedEventChannel = "stop event listener by closed `event` channel"
|
||||||
ShardStopEventListenerByClosedStopChannel = "stop event listener by closed `stop` channel"
|
|
||||||
ShardEventProcessingInProgress = "event processing is in progress, skip the received"
|
ShardEventProcessingInProgress = "event processing is in progress, skip the received"
|
||||||
|
ShardEventProcessingStart = "event processing is started"
|
||||||
|
ShardEventProcessingEnd = "event processing is completed"
|
||||||
|
ShardStopEventListenerByClosedStopChannel = "stop event listener by closed `stop` channel"
|
||||||
ShardStopEventListenerByContext = "stop event listener by context"
|
ShardStopEventListenerByContext = "stop event listener by context"
|
||||||
ShardCouldNotSubmitGCJobToWorkerPool = "could not submit GC job to worker pool"
|
ShardCouldNotSubmitGCJobToWorkerPool = "could not submit GC job to worker pool"
|
||||||
ShardGCIsStopped = "GC is stopped"
|
ShardGCIsStopped = "GC is stopped"
|
||||||
|
|
|
@ -158,7 +158,6 @@ func (s *Shard) Init(ctx context.Context) error {
|
||||||
eventChan: make(chan Event),
|
eventChan: make(chan Event),
|
||||||
mEventHandler: map[eventType]*eventHandlers{
|
mEventHandler: map[eventType]*eventHandlers{
|
||||||
eventNewEpoch: {
|
eventNewEpoch: {
|
||||||
cancelFunc: func() {},
|
|
||||||
handlers: []eventHandler{
|
handlers: []eventHandler{
|
||||||
s.collectExpiredLocks,
|
s.collectExpiredLocks,
|
||||||
s.collectExpiredObjects,
|
s.collectExpiredObjects,
|
||||||
|
|
|
@ -3,6 +3,7 @@ package shard
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
@ -61,11 +62,11 @@ func EventNewEpoch(e uint64) Event {
|
||||||
type eventHandler func(context.Context, Event)
|
type eventHandler func(context.Context, Event)
|
||||||
|
|
||||||
type eventHandlers struct {
|
type eventHandlers struct {
|
||||||
prevGroup sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
|
|
||||||
cancelFunc context.CancelFunc
|
|
||||||
|
|
||||||
handlers []eventHandler
|
handlers []eventHandler
|
||||||
|
|
||||||
|
runningHandlers atomic.Int32
|
||||||
}
|
}
|
||||||
|
|
||||||
type gcRunResult struct {
|
type gcRunResult struct {
|
||||||
|
@ -182,33 +183,36 @@ func (gc *gc) handleEvent(ctx context.Context, event Event) {
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if !v.runningHandlers.CompareAndSwap(0, int32(len(v.handlers))) {
|
||||||
v.cancelFunc()
|
gc.log.Warn(logs.ShardEventProcessingInProgress)
|
||||||
v.prevGroup.Wait()
|
return
|
||||||
|
}
|
||||||
var runCtx context.Context
|
addAndLog := func(delta int32) {
|
||||||
runCtx, v.cancelFunc = context.WithCancel(ctx)
|
if v.runningHandlers.Add(delta) == 0 {
|
||||||
|
gc.log.Debug(logs.ShardEventProcessingEnd)
|
||||||
v.prevGroup.Add(len(v.handlers))
|
}
|
||||||
|
}
|
||||||
|
gc.log.Debug(logs.ShardEventProcessingStart)
|
||||||
for i := range v.handlers {
|
for i := range v.handlers {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
addAndLog(int32(i - len(v.handlers)))
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
h := v.handlers[i]
|
h := v.handlers[i]
|
||||||
|
v.wg.Add(1)
|
||||||
err := gc.workerPool.Submit(func() {
|
err := gc.workerPool.Submit(func() {
|
||||||
defer v.prevGroup.Done()
|
h(ctx, event)
|
||||||
h(runCtx, event)
|
addAndLog(-1)
|
||||||
|
v.wg.Done()
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
gc.log.Warn(logs.ShardCouldNotSubmitGCJobToWorkerPool,
|
gc.log.Warn(logs.ShardCouldNotSubmitGCJobToWorkerPool,
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
)
|
)
|
||||||
|
addAndLog(-1)
|
||||||
v.prevGroup.Done()
|
v.wg.Done()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -264,7 +268,12 @@ func (gc *gc) stop() {
|
||||||
})
|
})
|
||||||
|
|
||||||
gc.log.Info(logs.ShardWaitingForGCWorkersToStop)
|
gc.log.Info(logs.ShardWaitingForGCWorkersToStop)
|
||||||
|
// Order is important here, at first we need to wait for listener ends,
|
||||||
|
// and then for handlers, which spawn inside the listener.
|
||||||
gc.wg.Wait()
|
gc.wg.Wait()
|
||||||
|
for _, h := range gc.mEventHandler {
|
||||||
|
h.wg.Wait()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// iterates over metabase and deletes objects
|
// iterates over metabase and deletes objects
|
||||||
|
|
Loading…
Reference in a new issue