gc: Skip received event if previous still processing #1228

Closed
acid-ant wants to merge 1 commit from acid-ant/frostfs-node:bugfix/gc-unblock-evnt-handler into master
3 changed files with 29 additions and 19 deletions

View file

@ -251,8 +251,10 @@ const (
ShardCantOpenMetabaseMoveToADegradedMode = "can't open metabase, move to a degraded mode" ShardCantOpenMetabaseMoveToADegradedMode = "can't open metabase, move to a degraded mode"
ShardCantInitializeMetabaseMoveToADegradedreadonlyMode = "can't initialize metabase, move to a degraded-read-only mode" ShardCantInitializeMetabaseMoveToADegradedreadonlyMode = "can't initialize metabase, move to a degraded-read-only mode"
ShardStopEventListenerByClosedEventChannel = "stop event listener by closed `event` channel" ShardStopEventListenerByClosedEventChannel = "stop event listener by closed `event` channel"
ShardStopEventListenerByClosedStopChannel = "stop event listener by closed `stop` channel"
ShardEventProcessingInProgress = "event processing is in progress, skip the received" ShardEventProcessingInProgress = "event processing is in progress, skip the received"
ShardEventProcessingStart = "event processing is started"
ShardEventProcessingEnd = "event processing is completed"
ShardStopEventListenerByClosedStopChannel = "stop event listener by closed `stop` channel"
ShardStopEventListenerByContext = "stop event listener by context" ShardStopEventListenerByContext = "stop event listener by context"
ShardCouldNotSubmitGCJobToWorkerPool = "could not submit GC job to worker pool" ShardCouldNotSubmitGCJobToWorkerPool = "could not submit GC job to worker pool"
ShardGCIsStopped = "GC is stopped" ShardGCIsStopped = "GC is stopped"

View file

@ -158,7 +158,6 @@ func (s *Shard) Init(ctx context.Context) error {
eventChan: make(chan Event), eventChan: make(chan Event),
mEventHandler: map[eventType]*eventHandlers{ mEventHandler: map[eventType]*eventHandlers{
eventNewEpoch: { eventNewEpoch: {
cancelFunc: func() {},
handlers: []eventHandler{ handlers: []eventHandler{
s.collectExpiredLocks, s.collectExpiredLocks,
s.collectExpiredObjects, s.collectExpiredObjects,

View file

@ -3,6 +3,7 @@ package shard
import ( import (
"context" "context"
"sync" "sync"
"sync/atomic"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
@ -61,11 +62,11 @@ func EventNewEpoch(e uint64) Event {
type eventHandler func(context.Context, Event) type eventHandler func(context.Context, Event)
type eventHandlers struct { type eventHandlers struct {
prevGroup sync.WaitGroup wg sync.WaitGroup
cancelFunc context.CancelFunc
handlers []eventHandler handlers []eventHandler
runningHandlers atomic.Int32
} }
type gcRunResult struct { type gcRunResult struct {
@ -182,33 +183,36 @@ func (gc *gc) handleEvent(ctx context.Context, event Event) {
if !ok { if !ok {
return return
} }
fyrchik marked this conversation as resolved Outdated

Just to clarify: do we have only 1 event type?

Just to clarify: do we have only 1 event type?

Right now, yes - only for new epoch.

Right now, yes - only for new epoch.
if !v.runningHandlers.CompareAndSwap(0, int32(len(v.handlers))) {
v.cancelFunc() gc.log.Warn(logs.ShardEventProcessingInProgress)
v.prevGroup.Wait() return
}
var runCtx context.Context addAndLog := func(delta int32) {
runCtx, v.cancelFunc = context.WithCancel(ctx) if v.runningHandlers.Add(delta) == 0 {
gc.log.Debug(logs.ShardEventProcessingEnd)
v.prevGroup.Add(len(v.handlers)) }
}
gc.log.Debug(logs.ShardEventProcessingStart)

We have this Add(-1) + log in 3 different places, that's 12 lines for one debug log. Can it be made simpler?

We have this `Add(-1) + log` in 3 different places, that's 12 lines for one debug log. Can it be made simpler?

You are right, wrapped this part into the function.

You are right, wrapped this part into the function.

Anyway, how is this solution different from non-blocking send in the notification channel in the epoch handler?

Anyway, how is this solution different from non-blocking send in the notification channel in the epoch handler?

You are talking about buffered channel?

You are talking about buffered channel?

It doesn't matter, our handler hangs, with non-blocking send it won't.

It doesn't matter, our handler hangs, with non-blocking send it won't.

It hangs only for submitting into the pool. You think we should reduce this time too?

It hangs only for submitting into the pool. You think we should reduce this time too?

@fyrchik, created #1238.

@fyrchik, created https://git.frostfs.info/TrueCloudLab/frostfs-node/pulls/1238.

Submit to the pool stops because all workers in it are busy.
All workers are busy because they block on channel send.

#1238 solves this problem completely, do we need this PR?

Submit to the pool stops because all workers in it are busy. All workers are busy because they block on channel send. #1238 solves this problem completely, do we need this PR?

Looks like we can close it because we have only one event type, and it is not a big deal for us to wait for a completion, but not for a new notification.

Looks like we can close it because we have only one event type, and it is not a big deal for us to wait for a completion, but not for a new notification.
for i := range v.handlers { for i := range v.handlers {
select { select {
case <-ctx.Done(): case <-ctx.Done():
addAndLog(int32(i - len(v.handlers)))
return return
default: default:
} }
h := v.handlers[i] h := v.handlers[i]
v.wg.Add(1)
err := gc.workerPool.Submit(func() { err := gc.workerPool.Submit(func() {
aarifullin marked this conversation as resolved Outdated

Let v.runningHandlers = 1 before this assignment. When it's decremented, it becomes 0 and the next coming event is able to perform CompareAndSwap above. As soon as the next event passes through this gate, it starts iterating over v.handlers and increments wait group's counter v.wg.Add(1). Can't this cause a hang in v.wg.Done() within submitted handler that's not finished yet?

Let `v.runningHandlers = 1` before this assignment. When it's decremented, it becomes `0` and the next coming event is able to perform `CompareAndSwap` above. As soon as the next event passes through this gate, it starts iterating over `v.handlers` and increments wait group's counter `v.wg.Add(1)`. Can't this cause a hang in `v.wg.Done()` within submitted handler that's not finished yet?

v.wg.Done() is equal to v.wg.Add(-1), and according to source code is an action upon atomic.
As I understand, you are talking about situation, when we are able to submit in a pool, but there are no ready workers to process submitted action, because previous submitted action unblocked processing, but still in a working queue.
I don't think it is a problem.

`v.wg.Done()` is equal to `v.wg.Add(-1)`, and according to source code is an action upon `atomic`. As I understand, you are talking about situation, when we are able to submit in a pool, but there are no ready workers to process submitted action, because previous submitted action unblocked processing, but still in a working queue. I don't think it is a problem.

Sorry, I quickly glanced at the code and I mistook Done for Wait.
Yes, in this case I believe we have no problem

Sorry, I quickly glanced at the code and I mistook `Done` for `Wait`. Yes, in this case I believe we have no problem
defer v.prevGroup.Done() h(ctx, event)
h(runCtx, event) addAndLog(-1)
v.wg.Done()
}) })
fyrchik marked this conversation as resolved Outdated

It is not obvious, why this code should not be executed if we exit prematurely.
Could you elaborate a bit on why this is the right behaviour?

It is not obvious, why this code should not be executed if we exit prematurely. Could you elaborate a bit on why this is the right behaviour?

I don't like this approach too. It was compromise, because we are handling only one type of events.
Refactored a bit to avoid leaking of the routines.

I don't like this approach too. It was compromise, because we are handling only one type of events. Refactored a bit to avoid leaking of the `routines`.
if err != nil { if err != nil {

Also, it seems this goroutine can leak, who waits until it is finished?

Also, it seems this goroutine can leak, who waits until it is finished?
gc.log.Warn(logs.ShardCouldNotSubmitGCJobToWorkerPool, gc.log.Warn(logs.ShardCouldNotSubmitGCJobToWorkerPool,

Why waiting is performed by separate goroutine?

Why waiting is performed by separate goroutine?

Refactored this part. Please review.

Refactored this part. Please review.
zap.String("error", err.Error()), zap.String("error", err.Error()),
) )
addAndLog(-1)
v.prevGroup.Done() v.wg.Done()
} }
} }
} }
@ -264,7 +268,12 @@ func (gc *gc) stop() {
}) })
gc.log.Info(logs.ShardWaitingForGCWorkersToStop) gc.log.Info(logs.ShardWaitingForGCWorkersToStop)
// Order is important here, at first we need to wait for listener ends,
// and then for handlers, which spawn inside the listener.
gc.wg.Wait() gc.wg.Wait()
for _, h := range gc.mEventHandler {
h.wg.Wait()
}
} }
// iterates over metabase and deletes objects // iterates over metabase and deletes objects