package notificator import ( "context" "sync" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event" "git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/logs" "go.uber.org/zap" ) // handlerLimiter is a limiter to make some works to be sequential // and interrupt previous one if new one is submitted. type handlerLimiter struct { ctx context.Context log *zap.Logger handler NewEpochHandler work chan func() mu sync.Mutex cancelCurrent context.CancelFunc } func newHandlerLimiter(ctx context.Context, handler NewEpochHandler, log *zap.Logger) *handlerLimiter { hl := &handlerLimiter{ ctx: ctx, log: log, handler: handler, work: make(chan func()), cancelCurrent: func() {}, } go hl.start(ctx) return hl } func (h *handlerLimiter) start(ctx context.Context) { for { select { case <-ctx.Done(): close(h.work) return case work := <-h.work: work() } } } func (h *handlerLimiter) replaceCurrentWorkContext(ctx context.Context) (workCtx context.Context) { h.mu.Lock() defer h.mu.Unlock() h.cancelCurrent() workCtx, h.cancelCurrent = context.WithCancel(ctx) return workCtx } func (h *handlerLimiter) Handler(e event.Event) { ee, ok := e.(NewEpochEvent) if !ok { return } workCtx := h.replaceCurrentWorkContext(h.ctx) h.log.Debug(logs.NewEpochWasTriggered, zap.Uint64("epoch", ee.Epoch)) h.work <- func() { h.handler(workCtx, ee) } }