71 lines
1.4 KiB
Go
71 lines
1.4 KiB
Go
package notificator
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/event"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-lifecycler/internal/logs"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// handlerLimiter is a limiter to make some works to be sequential
|
|
// and interrupt previous one if new one is submitted.
|
|
type handlerLimiter struct {
|
|
ctx context.Context
|
|
log *zap.Logger
|
|
handler NewEpochHandler
|
|
work chan func()
|
|
|
|
mu sync.Mutex
|
|
cancelCurrent context.CancelFunc
|
|
}
|
|
|
|
func newHandlerLimiter(ctx context.Context, handler NewEpochHandler, log *zap.Logger) *handlerLimiter {
|
|
hl := &handlerLimiter{
|
|
ctx: ctx,
|
|
log: log,
|
|
handler: handler,
|
|
work: make(chan func()),
|
|
cancelCurrent: func() {},
|
|
}
|
|
|
|
go hl.start(ctx)
|
|
|
|
return hl
|
|
}
|
|
|
|
func (h *handlerLimiter) start(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
close(h.work)
|
|
return
|
|
case work := <-h.work:
|
|
work()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (h *handlerLimiter) replaceCurrentWorkContext(ctx context.Context) (workCtx context.Context) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
|
|
h.cancelCurrent()
|
|
workCtx, h.cancelCurrent = context.WithCancel(ctx)
|
|
|
|
return workCtx
|
|
}
|
|
|
|
func (h *handlerLimiter) Handler(e event.Event) {
|
|
ee, ok := e.(NewEpochEvent)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
workCtx := h.replaceCurrentWorkContext(h.ctx)
|
|
h.log.Debug(logs.NewEpochWasTriggered, zap.Uint64("epoch", ee.Epoch))
|
|
h.work <- func() {
|
|
h.handler(workCtx, ee)
|
|
}
|
|
}
|