gc: Skip received event if previous still processing #1228
|
@ -251,8 +251,10 @@ const (
|
||||||
ShardCantOpenMetabaseMoveToADegradedMode = "can't open metabase, move to a degraded mode"
|
ShardCantOpenMetabaseMoveToADegradedMode = "can't open metabase, move to a degraded mode"
|
||||||
ShardCantInitializeMetabaseMoveToADegradedreadonlyMode = "can't initialize metabase, move to a degraded-read-only mode"
|
ShardCantInitializeMetabaseMoveToADegradedreadonlyMode = "can't initialize metabase, move to a degraded-read-only mode"
|
||||||
ShardStopEventListenerByClosedEventChannel = "stop event listener by closed `event` channel"
|
ShardStopEventListenerByClosedEventChannel = "stop event listener by closed `event` channel"
|
||||||
ShardStopEventListenerByClosedStopChannel = "stop event listener by closed `stop` channel"
|
|
||||||
ShardEventProcessingInProgress = "event processing is in progress, skip the received"
|
ShardEventProcessingInProgress = "event processing is in progress, skip the received"
|
||||||
|
ShardEventProcessingStart = "event processing is started"
|
||||||
|
ShardEventProcessingEnd = "event processing is completed"
|
||||||
|
ShardStopEventListenerByClosedStopChannel = "stop event listener by closed `stop` channel"
|
||||||
ShardStopEventListenerByContext = "stop event listener by context"
|
ShardStopEventListenerByContext = "stop event listener by context"
|
||||||
ShardCouldNotSubmitGCJobToWorkerPool = "could not submit GC job to worker pool"
|
ShardCouldNotSubmitGCJobToWorkerPool = "could not submit GC job to worker pool"
|
||||||
ShardGCIsStopped = "GC is stopped"
|
ShardGCIsStopped = "GC is stopped"
|
||||||
|
|
|
@ -158,7 +158,6 @@ func (s *Shard) Init(ctx context.Context) error {
|
||||||
eventChan: make(chan Event),
|
eventChan: make(chan Event),
|
||||||
mEventHandler: map[eventType]*eventHandlers{
|
mEventHandler: map[eventType]*eventHandlers{
|
||||||
eventNewEpoch: {
|
eventNewEpoch: {
|
||||||
cancelFunc: func() {},
|
|
||||||
handlers: []eventHandler{
|
handlers: []eventHandler{
|
||||||
s.collectExpiredLocks,
|
s.collectExpiredLocks,
|
||||||
s.collectExpiredObjects,
|
s.collectExpiredObjects,
|
||||||
|
|
|
@ -3,6 +3,7 @@ package shard
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
@ -61,11 +62,11 @@ func EventNewEpoch(e uint64) Event {
|
||||||
type eventHandler func(context.Context, Event)
|
type eventHandler func(context.Context, Event)
|
||||||
|
|
||||||
type eventHandlers struct {
|
type eventHandlers struct {
|
||||||
prevGroup sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
|
|
||||||
cancelFunc context.CancelFunc
|
|
||||||
|
|
||||||
handlers []eventHandler
|
handlers []eventHandler
|
||||||
|
|
||||||
|
runningHandlers atomic.Int32
|
||||||
}
|
}
|
||||||
|
|
||||||
type gcRunResult struct {
|
type gcRunResult struct {
|
||||||
|
@ -182,33 +183,36 @@ func (gc *gc) handleEvent(ctx context.Context, event Event) {
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
fyrchik marked this conversation as resolved
Outdated
|
|||||||
|
if !v.runningHandlers.CompareAndSwap(0, int32(len(v.handlers))) {
|
||||||
v.cancelFunc()
|
gc.log.Warn(logs.ShardEventProcessingInProgress)
|
||||||
v.prevGroup.Wait()
|
return
|
||||||
|
}
|
||||||
var runCtx context.Context
|
addAndLog := func(delta int32) {
|
||||||
runCtx, v.cancelFunc = context.WithCancel(ctx)
|
if v.runningHandlers.Add(delta) == 0 {
|
||||||
|
gc.log.Debug(logs.ShardEventProcessingEnd)
|
||||||
v.prevGroup.Add(len(v.handlers))
|
}
|
||||||
|
}
|
||||||
|
gc.log.Debug(logs.ShardEventProcessingStart)
|
||||||
fyrchik
commented
We have this We have this `Add(-1) + log` in 3 different places, that's 12 lines for one debug log. Can it be made simpler?
acid-ant
commented
You are right, wrapped this part into the function. You are right, wrapped this part into the function.
fyrchik
commented
Anyway, how is this solution different from non-blocking send in the notification channel in the epoch handler? Anyway, how is this solution different from non-blocking send in the notification channel in the epoch handler?
acid-ant
commented
You are talking about buffered channel? You are talking about buffered channel?
fyrchik
commented
It doesn't matter, our handler hangs, with non-blocking send it won't. It doesn't matter, our handler hangs, with non-blocking send it won't.
acid-ant
commented
It hangs only for submitting into the pool. You think we should reduce this time too? It hangs only for submitting into the pool. You think we should reduce this time too?
acid-ant
commented
fyrchik
commented
Submit to the pool stops because all workers in it are busy. #1238 solves this problem completely, do we need this PR? Submit to the pool stops because all workers in it are busy.
All workers are busy because they block on channel send.
#1238 solves this problem completely, do we need this PR?
acid-ant
commented
Looks like we can close it because we have only one event type, and it is not a big deal for us to wait for a completion, but not for a new notification. Looks like we can close it because we have only one event type, and it is not a big deal for us to wait for a completion, but not for a new notification.
|
|||||||
for i := range v.handlers {
|
for i := range v.handlers {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
addAndLog(int32(i - len(v.handlers)))
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
h := v.handlers[i]
|
h := v.handlers[i]
|
||||||
|
v.wg.Add(1)
|
||||||
err := gc.workerPool.Submit(func() {
|
err := gc.workerPool.Submit(func() {
|
||||||
aarifullin marked this conversation as resolved
Outdated
aarifullin
commented
Let Let `v.runningHandlers = 1` before this assignment. When it's decremented, it becomes `0` and the next coming event is able to perform `CompareAndSwap` above. As soon as the next event passes through this gate, it starts iterating over `v.handlers` and increments wait group's counter `v.wg.Add(1)`. Can't this cause a hang in `v.wg.Done()` within submitted handler that's not finished yet?
acid-ant
commented
`v.wg.Done()` is equal to `v.wg.Add(-1)`, and according to source code is an action upon `atomic`.
As I understand, you are talking about situation, when we are able to submit in a pool, but there are no ready workers to process submitted action, because previous submitted action unblocked processing, but still in a working queue.
I don't think it is a problem.
aarifullin
commented
Sorry, I quickly glanced at the code and I mistook Sorry, I quickly glanced at the code and I mistook `Done` for `Wait`.
Yes, in this case I believe we have no problem
|
|||||||
defer v.prevGroup.Done()
|
h(ctx, event)
|
||||||
h(runCtx, event)
|
addAndLog(-1)
|
||||||
|
v.wg.Done()
|
||||||
})
|
})
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
It is not obvious, why this code should not be executed if we exit prematurely. It is not obvious, why this code should not be executed if we exit prematurely.
Could you elaborate a bit on why this is the right behaviour?
acid-ant
commented
I don't like this approach too. It was compromise, because we are handling only one type of events. I don't like this approach too. It was compromise, because we are handling only one type of events.
Refactored a bit to avoid leaking of the `routines`.
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
fyrchik
commented
Also, it seems this goroutine can leak, who waits until it is finished? Also, it seems this goroutine can leak, who waits until it is finished?
|
|||||||
gc.log.Warn(logs.ShardCouldNotSubmitGCJobToWorkerPool,
|
gc.log.Warn(logs.ShardCouldNotSubmitGCJobToWorkerPool,
|
||||||
dstepanov-yadro
commented
Why waiting is performed by separate goroutine? Why waiting is performed by separate goroutine?
acid-ant
commented
Refactored this part. Please review. Refactored this part. Please review.
|
|||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
)
|
)
|
||||||
|
addAndLog(-1)
|
||||||
v.prevGroup.Done()
|
v.wg.Done()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -264,7 +268,12 @@ func (gc *gc) stop() {
|
||||||
})
|
})
|
||||||
|
|
||||||
gc.log.Info(logs.ShardWaitingForGCWorkersToStop)
|
gc.log.Info(logs.ShardWaitingForGCWorkersToStop)
|
||||||
|
// Order is important here, at first we need to wait for listener ends,
|
||||||
|
// and then for handlers, which spawn inside the listener.
|
||||||
gc.wg.Wait()
|
gc.wg.Wait()
|
||||||
|
for _, h := range gc.mEventHandler {
|
||||||
|
h.wg.Wait()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// iterates over metabase and deletes objects
|
// iterates over metabase and deletes objects
|
||||||
|
|
Just to clarify: do we have only 1 event type?
Right now, yes - only for new epoch.