[#378] shard: Control the completion of all handlers of the previous event

Group handlers of the particular event to a WaitGroup and wait for it before
the next event handling. This will ensure that all handlers complete and
prevent potential conflicts between past and present jobs.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
remotes/KirillovDenis/release/v0.21.1
Leonard Lyubich 2021-02-16 15:17:37 +03:00 committed by Alex Vanin
parent 18add24727
commit a9a1acc880
1 changed files with 9 additions and 0 deletions

View File

@ -2,6 +2,7 @@ package shard
import ( import (
"context" "context"
"sync"
"time" "time"
"github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/object"
@ -25,6 +26,8 @@ const (
type eventHandler func(context.Context, Event) type eventHandler func(context.Context, Event)
type eventHandlers struct { type eventHandlers struct {
prevGroup sync.WaitGroup
cancelFunc context.CancelFunc cancelFunc context.CancelFunc
handlers []eventHandler handlers []eventHandler
@ -97,18 +100,24 @@ func (gc *gc) listenEvents() {
} }
v.cancelFunc() v.cancelFunc()
v.prevGroup.Wait()
var ctx context.Context var ctx context.Context
ctx, v.cancelFunc = context.WithCancel(context.Background()) ctx, v.cancelFunc = context.WithCancel(context.Background())
v.prevGroup.Add(len(v.handlers))
for _, h := range v.handlers { for _, h := range v.handlers {
err := gc.workerPool.Submit(func() { err := gc.workerPool.Submit(func() {
h(ctx, event) h(ctx, event)
v.prevGroup.Done()
}) })
if err != nil { if err != nil {
gc.log.Warn("could not submit GC job to worker pool", gc.log.Warn("could not submit GC job to worker pool",
zap.String("error", err.Error()), zap.String("error", err.Error()),
) )
v.prevGroup.Done()
} }
} }
} }